This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7ff7affc3c [ASTERIXDB-3229][EXT] Part 5: Introduce external filter
interfaces
7ff7affc3c is described below
commit 7ff7affc3ce28b8d0da5a7953f0037d39c27f712
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Fri Aug 4 18:07:12 2023 -0700
[ASTERIXDB-3229][EXT] Part 5: Introduce external filter interfaces
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
A prep patch for dynamic prefixes
Change-Id: Icaa3958ac1af11ef5f63ba125b7ce5858b04112c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17696
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Hussain Towaileb <[email protected]>
---
.../asterix/common/external/IAdapterFactory.java | 8 ++--
.../common/external/IExternalFilterEvaluator.java | 31 ++++++++++++++
.../external/IExternalFilterEvaluatorFactory.java | 30 ++++++++++++++
.../external/NoOpExternalFilterEvaluator.java | 48 ++++++++++++++++++++++
.../NoOpExternalFilterEvaluatorFactory.java | 37 +++++++++++++++++
.../adapter/factory/ExternalAdapterFactory.java | 7 +++-
.../adapter/factory/GenericAdapterFactory.java | 16 ++++----
.../external/api/IExternalDataSourceFactory.java | 6 ++-
.../external/input/HDFSDataSourceFactory.java | 5 ++-
.../AbstractExternalInputStreamFactory.java | 5 ++-
.../record/reader/aws/AwsS3InputStreamFactory.java | 7 ++--
.../record/reader/aws/AwsS3ReaderFactory.java | 7 ++--
.../aws/parquet/AwsS3ParquetReaderFactory.java | 4 +-
.../azure/blob/AzureBlobInputStreamFactory.java | 7 ++--
.../reader/azure/blob/AzureBlobReaderFactory.java | 7 ++--
.../datalake/AzureDataLakeInputStreamFactory.java | 7 ++--
.../azure/datalake/AzureDataLakeReaderFactory.java | 7 ++--
.../parquet/AzureBlobParquetReaderFactory.java | 4 +-
.../parquet/AzureDataLakeParquetReaderFactory.java | 4 +-
.../record/reader/gcs/GCSInputStreamFactory.java | 13 +++---
.../input/record/reader/gcs/GCSReaderFactory.java | 7 ++--
.../gcs/parquet/GCSParquetReaderFactory.java | 4 +-
.../reader/http/HttpServerRecordReaderFactory.java | 5 ++-
.../record/reader/rss/RSSRecordReaderFactory.java | 3 +-
.../reader/stream/StreamRecordReaderFactory.java | 6 ++-
.../reader/twitter/TwitterRecordReaderFactory.java | 4 +-
.../stream/factory/LocalFSInputStreamFactory.java | 4 +-
.../factory/SocketClientInputStreamFactory.java | 4 +-
.../factory/SocketServerInputStreamFactory.java | 4 +-
.../factory/TwitterFirehoseStreamFactory.java | 3 +-
.../operators/FeedIntakeOperatorDescriptor.java | 20 ++++++---
.../external/provider/AdapterFactoryProvider.java | 6 ++-
.../reader/RecordWithPKTestReaderFactory.java | 3 +-
.../record/reader/kv/KVTestReaderFactory.java | 3 +-
.../library/adapter/TestTypedAdapterFactory.java | 3 +-
.../metadata/declared/DatasetDataSource.java | 4 +-
.../declared/FunctionDataSourceFactory.java | 5 ++-
.../metadata/declared/LoadableDataSource.java | 4 +-
.../metadata/declared/MetadataProvider.java | 7 ++--
.../asterix/metadata/feeds/FeedMetadataUtil.java | 10 +++--
40 files changed, 290 insertions(+), 79 deletions(-)
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
index 65b415d412..a8baf48102 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
@@ -60,7 +60,7 @@ public interface IAdapterFactory extends Serializable {
/**
* Creates an instance of IDatasourceAdapter.
*
- * @param ctx HyracksTaskContext
+ * @param ctx HyracksTaskContext
* @param partition partition number
* @return An instance of IDatasourceAdapter.
* @throws Exception
@@ -72,10 +72,12 @@ public interface IAdapterFactory extends Serializable {
*
* @param serviceContext
* @param configuration
- * @param warningCollector warning collector
+ * @param warningCollector warning collector
+ * @param filterEvaluatorFactory
* @throws AlgebricksException
* @throws HyracksDataException
*/
void configure(ICCServiceContext serviceContext, Map<String, String>
configuration,
- IWarningCollector warningCollector) throws HyracksDataException,
AlgebricksException;
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws HyracksDataException, AlgebricksException;
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
new file mode 100644
index 0000000000..22cd20a8bb
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IExternalFilterEvaluator {
+ boolean isEmpty();
+
+ boolean isComputedFieldUsed(int index);
+
+ void setValue(int index, String stringValue) throws HyracksDataException;
+
+ boolean evaluate() throws HyracksDataException;
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluatorFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluatorFactory.java
new file mode 100644
index 0000000000..38a38a69ae
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluatorFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public interface IExternalFilterEvaluatorFactory extends Serializable {
+ IExternalFilterEvaluator create(IServiceContext serviceContext,
IWarningCollector warningCollector)
+ throws HyracksDataException;
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/NoOpExternalFilterEvaluator.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/NoOpExternalFilterEvaluator.java
new file mode 100644
index 0000000000..78ebeb4d90
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/NoOpExternalFilterEvaluator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+class NoOpExternalFilterEvaluator implements IExternalFilterEvaluator {
+ static final IExternalFilterEvaluator INSTANCE = new
NoOpExternalFilterEvaluator();
+
+ private NoOpExternalFilterEvaluator() {
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return true;
+ }
+
+ @Override
+ public boolean isComputedFieldUsed(int index) {
+ return false;
+ }
+
+ @Override
+ public void setValue(int index, String stringValue) throws
HyracksDataException {
+ throw new IndexOutOfBoundsException("Number of paths is 0");
+ }
+
+ @Override
+ public boolean evaluate() throws HyracksDataException {
+ return true;
+ }
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/NoOpExternalFilterEvaluatorFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/NoOpExternalFilterEvaluatorFactory.java
new file mode 100644
index 0000000000..4b5bebb282
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/NoOpExternalFilterEvaluatorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public class NoOpExternalFilterEvaluatorFactory implements
IExternalFilterEvaluatorFactory {
+ public static final IExternalFilterEvaluatorFactory INSTANCE = new
NoOpExternalFilterEvaluatorFactory();
+ private static final long serialVersionUID = 6470035020297216949L;
+
+ private NoOpExternalFilterEvaluatorFactory() {
+ }
+
+ @Override
+ public IExternalFilterEvaluator create(IServiceContext serviceContext,
IWarningCollector warningCollector)
+ throws HyracksDataException {
+ return NoOpExternalFilterEvaluator.INSTANCE;
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/ExternalAdapterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/ExternalAdapterFactory.java
index 45c4b123d4..9741bb14be 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/ExternalAdapterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/ExternalAdapterFactory.java
@@ -24,6 +24,8 @@ import java.util.Map;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.library.ILibrary;
import org.apache.asterix.common.library.ILibraryManager;
@@ -66,7 +68,7 @@ public final class ExternalAdapterFactory implements
ITypedAdapterFactory {
@Override
public void configure(ICCServiceContext serviceContext, Map<String,
String> configuration,
- IWarningCollector warningCollector) {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory) {
this.serviceContext = serviceContext;
this.configuration = configuration;
}
@@ -92,7 +94,8 @@ public final class ExternalAdapterFactory implements
ITypedAdapterFactory {
ITypedAdapterFactory adapterFactory = (ITypedAdapterFactory)
cl.loadClass(className).newInstance();
adapterFactory.setOutputType(outputType);
adapterFactory.setMetaType(metaType);
- adapterFactory.configure(null, configuration,
ctx.getWarningCollector());
+ adapterFactory.configure(null, configuration,
ctx.getWarningCollector(),
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
return adapterFactory.createAdapter(ctx, partition);
} catch (InstantiationException | IllegalAccessException |
ClassNotFoundException | AlgebricksException e) {
throw HyracksDataException.create(e);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index fb08586595..89ee37be9c 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -25,6 +25,8 @@ import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.api.IDataParserFactory;
@@ -118,7 +120,8 @@ public class GenericAdapterFactory implements
ITypedAdapterFactory {
if (dataSourceFactory == null) {
dataSourceFactory = createExternalDataSourceFactory(configuration);
// create and configure parser factory
- dataSourceFactory.configure(serviceContext, configuration,
warningCollector);
+ dataSourceFactory.configure(serviceContext, configuration,
warningCollector,
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
}
if (dataParserFactory == null) {
// create and configure parser factory
@@ -131,12 +134,13 @@ public class GenericAdapterFactory implements
ITypedAdapterFactory {
@Override
public void configure(ICCServiceContext serviceContext, Map<String,
String> configuration,
- IWarningCollector warningCollector) throws HyracksDataException,
AlgebricksException {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws HyracksDataException, AlgebricksException {
this.configuration = configuration;
ICcApplicationContext appCtx = (ICcApplicationContext)
serviceContext.getApplicationContext();
ExternalDataUtils.validateDataSourceParameters(configuration);
dataSourceFactory = createExternalDataSourceFactory(configuration);
- dataSourceFactory.configure(serviceContext, configuration,
warningCollector);
+ dataSourceFactory.configure(serviceContext, configuration,
warningCollector, filterEvaluatorFactory);
ExternalDataUtils.validateDataParserParameters(configuration);
dataParserFactory = createDataParserFactory(configuration);
dataParserFactory.setRecordType(recordType);
@@ -199,10 +203,8 @@ public class GenericAdapterFactory implements
ITypedAdapterFactory {
/**
* Use pre-configured datasource factory For function datasources
*
- * @param dataSourceFactory
- * the function datasource factory
- * @param dataParserFactory
- * the function data parser factory
+ * @param dataSourceFactory the function datasource factory
+ * @param dataParserFactory the function data parser factory
* @throws AlgebricksException
*/
public void configure(IExternalDataSourceFactory dataSourceFactory,
IDataParserFactory dataParserFactory)
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index e5c4b3fcc4..d628bc74c5 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -27,6 +27,7 @@ import java.util.Set;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
@@ -62,10 +63,11 @@ public interface IExternalDataSourceFactory extends
Serializable {
* submitted AQL statement and any additional pairs added by the compiler
*
* @param configuration
+ * @param filterEvaluatorFactory
* @throws AsterixException
*/
- void configure(IServiceContext ctx, Map<String, String> configuration,
IWarningCollector warningCollector)
- throws AlgebricksException, HyracksDataException;
+ void configure(IServiceContext ctx, Map<String, String> configuration,
IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException, HyracksDataException;
/**
* returns the passed partition constraints if not null, otherwise returns
round robin absolute partition
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index f22d12887f..d7093b990e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -31,6 +31,7 @@ import java.util.Map;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
@@ -76,7 +77,6 @@ public class HDFSDataSourceFactory implements
IRecordReaderFactory<Object>, IExt
protected static Object initLock = new Object();
protected Map<String, String> configuration;
protected Class<?> recordClass;
- protected boolean indexingOp = false;
private JobConf conf;
private InputSplit[] inputSplits;
private String nodeName;
@@ -84,7 +84,8 @@ public class HDFSDataSourceFactory implements
IRecordReaderFactory<Object>, IExt
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
- IWarningCollector warningCollector) throws AlgebricksException,
HyracksDataException {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AlgebricksException, HyracksDataException {
JobConf hdfsConf = createHdfsConf(serviceCtx, configuration);
configureHdfsConf(hdfsConf, configuration);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index 7313b31078..ff6b03eb8b 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -26,6 +26,7 @@ import java.util.function.BiPredicate;
import java.util.regex.Matcher;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -62,8 +63,8 @@ public abstract class AbstractExternalInputStreamFactory
implements IInputStream
}
@Override
- public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector)
- throws AlgebricksException {
+ public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException {
this.configuration = configuration;
this.partitionConstraint =
((ICcApplicationContext)
ctx.getApplicationContext()).getClusterStateManager().getClusterLocations();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 3043f7ac7a..4fc63c6e9c 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.PriorityQueue;
import java.util.function.Supplier;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -51,9 +52,9 @@ public class AwsS3InputStreamFactory extends
AbstractExternalInputStreamFactory
}
@Override
- public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector)
- throws AlgebricksException {
- super.configure(ctx, configuration, warningCollector);
+ public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException {
+ super.configure(ctx, configuration, warningCollector,
filterEvaluatorFactory);
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
index 1dd0c8bfe1..6a21c79470 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -59,13 +60,13 @@ public class AwsS3ReaderFactory extends
StreamRecordReaderFactory {
}
@Override
- public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector)
- throws AlgebricksException, HyracksDataException {
+ public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException, HyracksDataException {
this.configuration = configuration;
// Stream factory
streamFactory = new AwsS3InputStreamFactory();
- streamFactory.configure(ctx, configuration, warningCollector);
+ streamFactory.configure(ctx, configuration, warningCollector,
filterEvaluatorFactory);
// record reader
recordReaderClazz =
StreamRecordReaderProvider.getRecordReaderClazz(configuration);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 93f1e69330..66312bb173 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -53,7 +54,8 @@ public class AwsS3ParquetReaderFactory extends
HDFSDataSourceFactory {
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
- IWarningCollector warningCollector) throws AlgebricksException,
HyracksDataException {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AlgebricksException, HyracksDataException {
//Get path
String path = configuration.containsKey(ExternalDataConstants.KEY_PATH)
? configuration.get(ExternalDataConstants.KEY_PATH) :
buildPathURIs(configuration, warningCollector);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
index 55c05218fb..b18b655211 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.PriorityQueue;
import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -52,9 +53,9 @@ public class AzureBlobInputStreamFactory extends
AbstractExternalInputStreamFact
}
@Override
- public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector)
- throws AlgebricksException {
- super.configure(ctx, configuration, warningCollector);
+ public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException {
+ super.configure(ctx, configuration, warningCollector,
filterEvaluatorFactory);
IApplicationContext appCtx = (IApplicationContext)
ctx.getApplicationContext();
// Ensure the validity of include/exclude
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobReaderFactory.java
index 525ee633de..0f4d6badbf 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobReaderFactory.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -59,13 +60,13 @@ public class AzureBlobReaderFactory extends
StreamRecordReaderFactory {
}
@Override
- public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector)
- throws AlgebricksException, HyracksDataException {
+ public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException, HyracksDataException {
this.configuration = configuration;
// Stream factory
streamFactory = new AzureBlobInputStreamFactory();
- streamFactory.configure(ctx, configuration, warningCollector);
+ streamFactory.configure(ctx, configuration, warningCollector,
filterEvaluatorFactory);
// record reader
recordReaderClazz =
StreamRecordReaderProvider.getRecordReaderClazz(configuration);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
index 929cb6e495..35c3648b72 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.PriorityQueue;
import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -52,9 +53,9 @@ public class AzureDataLakeInputStreamFactory extends
AbstractExternalInputStream
}
@Override
- public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector)
- throws AlgebricksException {
- super.configure(ctx, configuration, warningCollector);
+ public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException {
+ super.configure(ctx, configuration, warningCollector,
filterEvaluatorFactory);
IApplicationContext appCtx = (IApplicationContext)
ctx.getApplicationContext();
// Ensure the validity of include/exclude
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java
index 594bacfa1f..6f4685c0eb 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -59,13 +60,13 @@ public class AzureDataLakeReaderFactory extends
StreamRecordReaderFactory {
}
@Override
- public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector)
- throws AlgebricksException, HyracksDataException {
+ public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException, HyracksDataException {
this.configuration = configuration;
// Stream factory
streamFactory = new AzureDataLakeInputStreamFactory();
- streamFactory.configure(ctx, configuration, warningCollector);
+ streamFactory.configure(ctx, configuration, warningCollector,
filterEvaluatorFactory);
// record reader
recordReaderClazz =
StreamRecordReaderProvider.getRecordReaderClazz(configuration);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index e08013c33b..927e74ee2c 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -30,6 +30,7 @@ import java.util.Set;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -50,7 +51,8 @@ public class AzureBlobParquetReaderFactory extends
HDFSDataSourceFactory {
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
- IWarningCollector warningCollector) throws AlgebricksException,
HyracksDataException {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AlgebricksException, HyracksDataException {
IApplicationContext appCtx = (IApplicationContext)
serviceCtx.getApplicationContext();
BlobServiceClient blobServiceClient = buildAzureBlobClient(appCtx,
configuration);
//Get endpoint
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
index c98fc8b695..3ef9c56280 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
@@ -30,6 +30,7 @@ import java.util.Set;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -50,7 +51,8 @@ public class AzureDataLakeParquetReaderFactory extends
HDFSDataSourceFactory {
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
- IWarningCollector warningCollector) throws AlgebricksException,
HyracksDataException {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AlgebricksException, HyracksDataException {
IApplicationContext appCtx = (IApplicationContext)
serviceCtx.getApplicationContext();
DataLakeServiceClient dataLakeServiceClient =
buildAzureDatalakeClient(appCtx, configuration);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
index 278c1ad9b7..433fecdcab 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -47,9 +48,9 @@ public class GCSInputStreamFactory extends
AbstractExternalInputStreamFactory {
}
@Override
- public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector)
- throws AlgebricksException {
- super.configure(ctx, configuration, warningCollector);
+ public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException {
+ super.configure(ctx, configuration, warningCollector,
filterEvaluatorFactory);
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
@@ -65,15 +66,15 @@ public class GCSInputStreamFactory extends
AbstractExternalInputStreamFactory {
/**
* To efficiently utilize the parallelism, work load will be distributed
amongst the partitions based on the file
* size.
- *
+ * <p>
* Example:
* File1 1mb, File2 300kb, File3 300kb, File4 300kb
- *
+ * <p>
* Distribution:
* Partition1: [File1]
* Partition2: [File2, File3, File4]
*
- * @param items items
+ * @param items items
* @param partitionsCount Partitions count
*/
private void distributeWorkLoad(List<Blob> items, int partitionsCount) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSReaderFactory.java
index ca42892def..981a29d5fa 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSReaderFactory.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -59,13 +60,13 @@ public class GCSReaderFactory extends
StreamRecordReaderFactory {
}
@Override
- public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector)
- throws AlgebricksException, HyracksDataException {
+ public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException, HyracksDataException {
this.configuration = configuration;
// Stream factory
streamFactory = new GCSInputStreamFactory();
- streamFactory.configure(ctx, configuration, warningCollector);
+ streamFactory.configure(ctx, configuration, warningCollector,
filterEvaluatorFactory);
// record reader
recordReaderClazz =
StreamRecordReaderProvider.getRecordReaderClazz(configuration);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
index 28874153a6..1de944b09f 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -45,7 +46,8 @@ public class GCSParquetReaderFactory extends
HDFSDataSourceFactory {
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
- IWarningCollector warningCollector) throws AlgebricksException,
HyracksDataException {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AlgebricksException, HyracksDataException {
// get path
String path = buildPathURIs(configuration, warningCollector);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
index 5954d74b44..b5ade310fe 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -87,8 +88,8 @@ public class HttpServerRecordReaderFactory implements
IRecordReaderFactory<char[
}
@Override
- public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector)
- throws AlgebricksException {
+ public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException {
this.configurations = configuration;
// necessary configs
addrValue = getConfigurationValue(KEY_CONFIGURATION_ADDRESSES, true);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index 1a5d2a25d9..0026f0722b 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -62,7 +63,7 @@ public class RSSRecordReaderFactory implements
IRecordReaderFactory<SyndEntry> {
@Override
public void configure(IServiceContext serviceContext, Map<String, String>
configuration,
- IWarningCollector warningCollector) {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory) {
this.serviceContext = serviceContext;
String url = configuration.get(ExternalDataConstants.KEY_RSS_URL);
if (url == null) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index c0a1b3806e..d9bbd01c38 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -83,10 +84,11 @@ public class StreamRecordReaderFactory implements
IRecordReaderFactory<char[]> {
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
- IWarningCollector warningCollector) throws HyracksDataException,
AlgebricksException {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws HyracksDataException, AlgebricksException {
this.configuration = configuration;
configureInputStreamFactory(configuration);
- streamFactory.configure(serviceCtx, configuration, warningCollector);
+ streamFactory.configure(serviceCtx, configuration, warningCollector,
filterEvaluatorFactory);
recordReaderClazz =
StreamRecordReaderProvider.getRecordReaderClazz(configuration);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 52d8c03ed5..ef55d4af32 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -80,7 +81,8 @@ public class TwitterRecordReaderFactory implements
IRecordReaderFactory<char[]>
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
- IWarningCollector warningCollector) throws AsterixException {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AsterixException {
try {
Class.forName("twitter4j.Twitter");
} catch (ClassNotFoundException e) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index cde0266a38..4f07a78466 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.INodeResolver;
@@ -87,7 +88,8 @@ public class LocalFSInputStreamFactory implements
IInputStreamFactory {
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
- IWarningCollector warningCollector) throws AsterixException {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AsterixException {
this.configuration = configuration;
String[] splits =
configuration.get(ExternalDataConstants.KEY_PATH).split(",");
if (inputFileSplits == null) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
index 5d2b2a6e50..ceabbfc833 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IInputStreamFactory;
@@ -57,7 +58,8 @@ public class SocketClientInputStreamFactory implements
IInputStreamFactory {
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
- IWarningCollector warningCollector) throws AsterixException {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AsterixException {
try {
this.serviceCtx = serviceCtx;
this.sockets = new ArrayList<>();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index d628062401..c8da459e82 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.input.stream.SocketServerInputStream;
@@ -45,7 +46,8 @@ public class SocketServerInputStreamFactory implements
IInputStreamFactory {
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
- IWarningCollector warningCollector) throws CompilationException {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws CompilationException {
try {
sockets =
FeedUtils.extractHostsPorts(configuration.get(ExternalDataConstants.KEY_MODE),
serviceCtx,
configuration.get(ExternalDataConstants.KEY_SOCKETS));
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
index ab7fa777f6..7a807a4c2d 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
@@ -86,7 +87,7 @@ public class TwitterFirehoseStreamFactory implements
IInputStreamFactory {
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
- IWarningCollector warningCollector) {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory) {
this.serviceCtx = serviceCtx;
this.configuration = configuration;
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 27ae55cecb..c70cff4792 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.asterix.active.EntityId;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.library.ILibrary;
import org.apache.asterix.common.library.ILibraryManager;
@@ -55,14 +56,20 @@ public class FeedIntakeOperatorDescriptor extends
AbstractSingleActivityOperator
private static final Logger LOGGER = LogManager.getLogger();
- /** The unique identifier of the feed that is being ingested. **/
+ /**
+ * The unique identifier of the feed that is being ingested.
+ **/
private final EntityId feedId;
private final FeedPolicyAccessor policyAccessor;
private final ARecordType adapterOutputType;
- /** The adaptor factory that is used to create an instance of the feed
adaptor **/
+ /**
+ * The adaptor factory that is used to create an instance of the feed
adaptor
+ **/
private ITypedAdapterFactory adaptorFactory;
- /** The library that contains the adapter in use. **/
+ /**
+ * The library that contains the adapter in use.
+ **/
private DataverseName adaptorLibraryDataverse;
private String adaptorLibraryName;
/**
@@ -70,7 +77,9 @@ public class FeedIntakeOperatorDescriptor extends
AbstractSingleActivityOperator
* This value is used only in the case of external adapters.
**/
private String adaptorFactoryClassName;
- /** The configuration parameters associated with the adapter. **/
+ /**
+ * The configuration parameters associated with the adapter.
+ **/
private Map<String, String> adaptorConfiguration;
public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed
primaryFeed, ITypedAdapterFactory adapterFactory,
@@ -120,7 +129,8 @@ public class FeedIntakeOperatorDescriptor extends
AbstractSingleActivityOperator
try {
adapterFactory = (ITypedAdapterFactory)
(classLoader.loadClass(adaptorFactoryClassName).newInstance());
adapterFactory.setOutputType(adapterOutputType);
- adapterFactory.configure(null, adaptorConfiguration,
ctx.getWarningCollector());
+ adapterFactory.configure(null, adaptorConfiguration,
ctx.getWarningCollector(),
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index 4b0148e678..3f7ae0e727 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.provider;
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.types.ARecordType;
@@ -40,7 +41,8 @@ public class AdapterFactoryProvider {
// get adapter factory. this method has the side effect of modifying the
configuration as necessary
public static ITypedAdapterFactory getAdapterFactory(ICCServiceContext
serviceCtx, String adapterName,
Map<String, String> configuration, ARecordType itemType,
ARecordType metaType,
- IWarningCollector warningCollector) throws HyracksDataException,
AlgebricksException {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws HyracksDataException, AlgebricksException {
ExternalDataUtils.defaultConfiguration(configuration);
ExternalDataUtils.prepare(adapterName, configuration);
ICcApplicationContext context = (ICcApplicationContext)
serviceCtx.getApplicationContext();
@@ -48,7 +50,7 @@ public class AdapterFactoryProvider {
(ITypedAdapterFactory)
context.getAdapterFactoryService().createAdapterFactory();
adapterFactory.setOutputType(itemType);
adapterFactory.setMetaType(metaType);
- adapterFactory.configure(serviceCtx, configuration, warningCollector);
+ adapterFactory.configure(serviceCtx, configuration, warningCollector,
filterEvaluatorFactory);
return adapterFactory;
}
}
diff --git
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
index c65e00d860..4f36915ca0 100644
---
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -50,7 +51,7 @@ public class RecordWithPKTestReaderFactory implements
IRecordReaderFactory<Recor
@Override
public void configure(IServiceContext serviceCtx, final Map<String,
String> configuration,
- IWarningCollector warningCollector) {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory) {
this.serviceCtx = serviceCtx;
}
diff --git
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
index 49ca23e18d..a21aeb62cd 100644
---
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -60,7 +61,7 @@ public class KVTestReaderFactory implements
IRecordReaderFactory<DCPRequest> {
@Override
public void configure(IServiceContext serviceCtx, final Map<String,
String> configuration,
- IWarningCollector warningCollector) {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory) {
this.serviceCtx = serviceCtx;
if (configuration.containsKey("num-of-records")) {
numOfRecords =
Integer.parseInt(configuration.get("num-of-records"));
diff --git
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index f36e35b27f..651b1909a7 100644
---
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.dataflow.TupleForwarder;
import org.apache.asterix.external.parser.ADMDataParser;
@@ -106,7 +107,7 @@ public class TestTypedAdapterFactory implements
ITypedAdapterFactory {
@Override
public void configure(ICCServiceContext serviceContext, Map<String,
String> configuration,
- IWarningCollector warningCollector) {
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory) {
this.configuration = configuration;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 9d4d508bcf..25eab7fe37 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -139,7 +140,8 @@ public class DatasetDataSource extends DataSource {
properties = addSubPath(externalDataSource.getProperties(),
properties);
properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE,
String.valueOf(externalScanBufferSize));
ITypedAdapterFactory adapterFactory =
metadataProvider.getConfiguredAdapterFactory(externalDataset,
- edd.getAdapter(), properties, (ARecordType) itemType,
null, context.getWarningCollector());
+ edd.getAdapter(), properties, (ARecordType) itemType,
context.getWarningCollector(),
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
return metadataProvider.getExternalDatasetScanRuntime(jobSpec,
itemType, adapterFactory,
tupleFilterFactory, outputLimit);
case INTERNAL:
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
index 5503cb07dd..22bfb3184e 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.metadata.api.IDatasourceFunction;
@@ -52,8 +53,8 @@ public class FunctionDataSourceFactory implements
IRecordReaderFactory<char[]> {
}
@Override
- public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector)
- throws AlgebricksException, HyracksDataException {
+ public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException, HyracksDataException {
// No Op
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index ef95511f84..222ce816ff 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.metadata.entities.Dataset;
@@ -144,7 +145,8 @@ public class LoadableDataSource extends DataSource {
LoadableDataSource alds = (LoadableDataSource) dataSource;
ARecordType itemType = (ARecordType) alds.getLoadedType();
ITypedAdapterFactory adapterFactory =
metadataProvider.getConfiguredAdapterFactory(alds.getTargetDataset(),
- alds.getAdapter(), alds.getAdapterProperties(), itemType,
null, context.getWarningCollector());
+ alds.getAdapter(), alds.getAdapterProperties(), itemType,
context.getWarningCollector(),
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv,
opSchema, context);
return metadataProvider.getLoadableDatasetScanRuntime(jobSpec,
adapterFactory, rDesc);
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index c5b8ea37d8..e15063b61e 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -43,6 +43,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.metadata.LockList;
@@ -899,13 +900,13 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
}
protected ITypedAdapterFactory getConfiguredAdapterFactory(Dataset
dataset, String adapterName,
- Map<String, String> configuration, ARecordType itemType,
ARecordType metaType,
- IWarningCollector warningCollector) throws AlgebricksException {
+ Map<String, String> configuration, ARecordType itemType,
IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException {
try {
configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE,
dataset.getDataverseName().getCanonicalForm());
return
AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(),
adapterName,
- configuration, itemType, metaType, warningCollector);
+ configuration, itemType, null, warningCollector,
filterEvaluatorFactory);
} catch (Exception e) {
throw new AlgebricksException("Unable to create adapter", e);
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 0ee9516f3c..e9944eeac4 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -32,6 +32,7 @@ import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.common.external.IDataSourceAdapter.AdapterType;
+import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.adapter.factory.ExternalAdapterFactory;
@@ -147,7 +148,8 @@ public class FeedMetadataUtil {
}
adapterFactory.setOutputType(adapterOutputType);
adapterFactory.setMetaType(metaType);
- adapterFactory.configure(appCtx.getServiceContext(),
configuration, warningCollector);
+ adapterFactory.configure(appCtx.getServiceContext(),
configuration, warningCollector,
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
if (metaType == null &&
configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME)) {
metaType = getOutputType(feed,
configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME));
if (metaType == null) {
@@ -227,10 +229,12 @@ public class FeedMetadataUtil {
}
adapterFactory.setOutputType(adapterOutputType);
adapterFactory.setMetaType(metaType);
- adapterFactory.configure(appCtx.getServiceContext(),
configuration, NoOpWarningCollector.INSTANCE);
+ adapterFactory.configure(appCtx.getServiceContext(),
configuration, NoOpWarningCollector.INSTANCE,
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
} else {
adapterFactory =
AdapterFactoryProvider.getAdapterFactory(appCtx.getServiceContext(),
adapterName,
- configuration, adapterOutputType, metaType,
NoOpWarningCollector.INSTANCE);
+ configuration, adapterOutputType, metaType,
NoOpWarningCollector.INSTANCE,
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
adapterType = IDataSourceAdapter.AdapterType.INTERNAL;
}
if (metaType == null) {