This is an automated email from the ASF dual-hosted git repository. alsuliman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit ce5f3aa366421a1106c842c3a818e283fe050f3d Merge: 5b5ac48 3ea8ef1 Author: Murtadha Hubail <[email protected]> AuthorDate: Wed Apr 7 00:13:08 2021 +0300 Merge mad-hatter into cheshire-cat Change-Id: I6e2ad0d4cf85f8706063db84b7af9fd4f6c83843 .../optimizer/rules/ConstantFoldingRule.java | 3 +- .../apache/asterix/api/common/APIFramework.java | 3 +- .../external_dataset.000.ddl.sqlpp | 66 ++++++++++++++++++++++ .../external_dataset.001.query.sqlpp | 23 ++++++++ .../external_dataset.002.query.sqlpp | 23 ++++++++ .../external_dataset.003.query.sqlpp | 23 ++++++++ .../external_dataset.004.query.sqlpp | 23 ++++++++ .../external_dataset.005.query.sqlpp | 23 ++++++++ .../external_dataset.006.query.sqlpp | 23 ++++++++ .../external_dataset.999.ddl.sqlpp | 20 +++++++ .../custom-buffer-size/external_dataset.001.adm | 1 + .../custom-buffer-size/external_dataset.002.adm | 1 + .../custom-buffer-size/external_dataset.003.adm | 1 + .../custom-buffer-size/external_dataset.004.adm | 25 ++++++++ .../custom-buffer-size/external_dataset.005.adm | 1 + .../custom-buffer-size/external_dataset.006.adm | 25 ++++++++ .../asterix/common/config/CompilerProperties.java | 15 +++++ .../common/config/OptimizationConfUtil.java | 16 +++++- .../input/record/reader/aws/AwsS3InputStream.java | 5 +- .../record/reader/stream/StreamRecordReader.java | 6 +- .../input/stream/AsterixInputStreamReader.java | 11 ++-- .../StandardUTF8ToModifiedUTF8DataOutput.java | 2 +- .../external/util/ExternalDataConstants.java | 1 + .../asterix/external/util/ExternalDataUtils.java | 6 ++ .../metadata/declared/DatasetDataSource.java | 6 ++ .../api/HeuristicCompilerFactoryBuilder.java | 3 +- .../algebricks/core/config/AlgebricksConfig.java | 3 + .../algebricks/core/jobgen/impl/JobGenContext.java | 9 ++- .../rewriter/base/PhysicalOptimizationConfig.java | 9 +++ 29 files changed, 357 insertions(+), 19 deletions(-) diff --cc asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java index e99817d,6f359c5..0c2eeba --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java @@@ -147,11 -144,10 +148,11 @@@ public class ConstantFoldingRule implem jobGenCtx = new JobGenContext(null, metadataProvider, appCtx, SerializerDeserializerProvider.INSTANCE, BinaryHashFunctionFactoryProvider.INSTANCE, BinaryHashFunctionFamilyProvider.INSTANCE, BinaryComparatorFactoryProvider.INSTANCE, TypeTraitProvider.INSTANCE, BinaryBooleanInspector.FACTORY, - BinaryIntegerInspector.FACTORY, ADMPrinterFactoryProvider.INSTANCE, MissingWriterFactory.INSTANCE, null, + BinaryIntegerInspector.FACTORY, ADMPrinterFactoryProvider.INSTANCE, MissingWriterFactory.INSTANCE, + UnnestingPositionWriterFactory.INSTANCE, null, new ExpressionRuntimeProvider(new QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())), ExpressionTypeComputer.INSTANCE, null, null, null, null, GlobalConfig.DEFAULT_FRAME_SIZE, null, - NoOpWarningCollector.INSTANCE, 0); + NoOpWarningCollector.INSTANCE, 0, new PhysicalOptimizationConfig()); } @Override diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index 0091304,c863080..919a24e --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@@ -130,22 -129,18 +130,23 @@@ public class APIFramework public static final String PREFIX_INTERNAL_PARAMETERS = "_internal"; // A white list of supported configurable parameters. - private static final Set<String> CONFIGURABLE_PARAMETER_NAMES = - ImmutableSet.of(CompilerProperties.COMPILER_JOINMEMORY_KEY, CompilerProperties.COMPILER_GROUPMEMORY_KEY, - CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_WINDOWMEMORY_KEY, - CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY, CompilerProperties.COMPILER_PARALLELISM_KEY, - CompilerProperties.COMPILER_SORT_PARALLEL_KEY, CompilerProperties.COMPILER_SORT_SAMPLES_KEY, - CompilerProperties.COMPILER_EXTERNALSCANMEMORY_KEY, FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, - FuzzyUtils.SIM_FUNCTION_PROP_NAME, FuzzyUtils.SIM_THRESHOLD_PROP_NAME, - StartFeedStatement.WAIT_FOR_COMPLETION, FeedActivityDetails.FEED_POLICY_NAME, - FeedActivityDetails.COLLECT_LOCATIONS, SqlppQueryRewriter.INLINE_WITH_OPTION, - SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, "hash_merge", "output-record-type", - AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION, - DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION); + private static final Set<String> CONFIGURABLE_PARAMETER_NAMES = ImmutableSet.of( + CompilerProperties.COMPILER_JOINMEMORY_KEY, CompilerProperties.COMPILER_GROUPMEMORY_KEY, + CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_WINDOWMEMORY_KEY, + CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY, CompilerProperties.COMPILER_PARALLELISM_KEY, + CompilerProperties.COMPILER_SORT_PARALLEL_KEY, CompilerProperties.COMPILER_SORT_SAMPLES_KEY, - CompilerProperties.COMPILER_INDEXONLY_KEY, CompilerProperties.COMPILER_INTERNAL_SANITYCHECK_KEY, ++ CompilerProperties.COMPILER_EXTERNALSCANMEMORY_KEY, CompilerProperties.COMPILER_INDEXONLY_KEY, ++ CompilerProperties.COMPILER_INTERNAL_SANITYCHECK_KEY, + CompilerProperties.COMPILER_EXTERNAL_FIELD_PUSHDOWN_KEY, CompilerProperties.COMPILER_SUBPLAN_MERGE_KEY, + CompilerProperties.COMPILER_SUBPLAN_NESTEDPUSHDOWN_KEY, + CompilerProperties.COMPILER_MIN_MEMORY_ALLOCATION_KEY, FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, + FuzzyUtils.SIM_FUNCTION_PROP_NAME, FuzzyUtils.SIM_THRESHOLD_PROP_NAME, + StartFeedStatement.WAIT_FOR_COMPLETION, FeedActivityDetails.FEED_POLICY_NAME, + FeedActivityDetails.COLLECT_LOCATIONS, SqlppQueryRewriter.INLINE_WITH_OPTION, + SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, "hash_merge", "output-record-type", + DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION, + SetAsterixPhysicalOperatorsRule.REWRITE_ATTEMPT_BATCH_ASSIGN, + EquivalenceClassUtils.REWRITE_INTERNAL_QUERYUID_PK); private final IRewriterFactory rewriterFactory; private final IAstPrintVisitorFactory astPrintVisitorFactory; diff --cc asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.000.ddl.sqlpp index 0000000,22a30ca..22a30ca mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.000.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.000.ddl.sqlpp diff --cc asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.001.query.sqlpp index 0000000,7c1708a..7c1708a mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.001.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.001.query.sqlpp diff --cc asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.002.query.sqlpp index 0000000,d122f42..d122f42 mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.002.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.002.query.sqlpp diff --cc asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.003.query.sqlpp index 0000000,552f943..552f943 mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.003.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.003.query.sqlpp diff --cc asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.004.query.sqlpp index 0000000,f30af80..f30af80 mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.004.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.004.query.sqlpp diff --cc asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.005.query.sqlpp index 0000000,af6aff0..af6aff0 mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.005.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.005.query.sqlpp diff --cc asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.006.query.sqlpp index 0000000,854ac80..854ac80 mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.006.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.006.query.sqlpp diff --cc asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.999.ddl.sqlpp index 0000000,548e632..548e632 mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.999.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/custom-buffer-size/external_dataset.999.ddl.sqlpp diff --cc asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.001.adm index 0000000,187a8cb..187a8cb mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.001.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.001.adm diff --cc asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.002.adm index 0000000,187a8cb..187a8cb mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.002.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.002.adm diff --cc asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.003.adm index 0000000,5db606c..5db606c mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.003.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.003.adm diff --cc asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.004.adm index 0000000,7660e7e..7660e7e mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.004.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.004.adm diff --cc asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.005.adm index 0000000,5db606c..5db606c mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.005.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.005.adm diff --cc asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.006.adm index 0000000,7643986..7643986 mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.006.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/custom-buffer-size/external_dataset.006.adm diff --cc asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java index 866d183,652f390..a58673f --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java @@@ -55,6 -56,10 +55,10 @@@ public class CompilerProperties extend LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(32L, MEGABYTE), "The memory budget (in bytes) for an inverted-index-search operator instance in a partition"), + COMPILER_EXTERNALSCANMEMORY( + INTEGER_BYTE_UNIT, - StorageUtil.getIntSizeInBytes(4, KILOBYTE), ++ StorageUtil.getIntSizeInBytes(8, KILOBYTE), + "The memory budget (in bytes) for an external scan operator instance in a partition"), COMPILER_FRAMESIZE( INTEGER_BYTE_UNIT, StorageUtil.getIntSizeInBytes(32, KILOBYTE), @@@ -126,6 -111,11 +130,11 @@@ public Object defaultValue() { return defaultValue; } + + @Override + public boolean hidden() { - return this == COMPILER_STRINGOFFSET || this == COMPILER_EXTERNALSCANMEMORY; ++ return this == COMPILER_EXTERNALSCANMEMORY; + } } public static final String COMPILER_SORTMEMORY_KEY = Option.COMPILER_SORTMEMORY.ini(); @@@ -144,18 -134,8 +153,20 @@@ public static final String COMPILER_SORT_SAMPLES_KEY = Option.COMPILER_SORT_SAMPLES.ini(); + public static final String COMPILER_INDEXONLY_KEY = Option.COMPILER_INDEXONLY.ini(); + + public static final String COMPILER_INTERNAL_SANITYCHECK_KEY = Option.COMPILER_INTERNAL_SANITYCHECK.ini(); + + public static final String COMPILER_EXTERNAL_FIELD_PUSHDOWN_KEY = Option.COMPILER_EXTERNAL_FIELD_PUSHDOWN.ini(); + + public static final String COMPILER_SUBPLAN_MERGE_KEY = Option.COMPILER_SUBPLAN_MERGE.ini(); + + public static final String COMPILER_SUBPLAN_NESTEDPUSHDOWN_KEY = Option.COMPILER_SUBPLAN_NESTEDPUSHDOWN.ini(); + + public static final String COMPILER_MIN_MEMORY_ALLOCATION_KEY = Option.COMPILER_MIN_MEMORY_ALLOCATION.ini(); + + public static final String COMPILER_EXTERNALSCANMEMORY_KEY = Option.COMPILER_EXTERNALSCANMEMORY.ini(); + public static final int COMPILER_PARALLELISM_AS_STORAGE = 0; public CompilerProperties(PropertiesAccessor accessor) { @@@ -198,27 -183,7 +209,31 @@@ return accessor.getInt(Option.COMPILER_SORT_SAMPLES); } + public boolean isIndexOnly() { + return accessor.getBoolean(Option.COMPILER_INDEXONLY); + } + + public boolean isSanityCheck() { + return accessor.getBoolean(Option.COMPILER_INTERNAL_SANITYCHECK); + } + + public boolean isFieldAccessPushdown() { + return accessor.getBoolean(Option.COMPILER_EXTERNAL_FIELD_PUSHDOWN); + } + + public boolean getSubplanMerge() { + return accessor.getBoolean(Option.COMPILER_SUBPLAN_MERGE); + } + + public boolean getSubplanNestedPushdown() { + return accessor.getBoolean(Option.COMPILER_SUBPLAN_NESTEDPUSHDOWN); + } + + public boolean getMinMemoryAllocation() { + return accessor.getBoolean(Option.COMPILER_MIN_MEMORY_ALLOCATION); + } ++ + public int getExternalScanMemorySize() { + return accessor.getInt(Option.COMPILER_EXTERNALSCANMEMORY); + } } diff --cc asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java index 8832054,22fb13b..c4dbd85 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java @@@ -58,21 -58,11 +58,23 @@@ public class OptimizationConfUtil compilerProperties.getWindowMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_WINDOW, sourceLoc); int textSearchFrameLimit = getTextSearchNumFrames(compilerProperties, querySpecificConfig, sourceLoc); int sortNumSamples = getSortSamples(compilerProperties, querySpecificConfig, sourceLoc); - boolean fullParallelSort = getSortParallel(compilerProperties, querySpecificConfig); + boolean fullParallelSort = getBoolean(querySpecificConfig, CompilerProperties.COMPILER_SORT_PARALLEL_KEY, + compilerProperties.getSortParallel()); + boolean indexOnly = getBoolean(querySpecificConfig, CompilerProperties.COMPILER_INDEXONLY_KEY, + compilerProperties.isIndexOnly()); + boolean sanityCheck = getBoolean(querySpecificConfig, CompilerProperties.COMPILER_INTERNAL_SANITYCHECK_KEY, + compilerProperties.isSanityCheck()); + boolean externalFieldPushdown = getBoolean(querySpecificConfig, + CompilerProperties.COMPILER_EXTERNAL_FIELD_PUSHDOWN_KEY, compilerProperties.isFieldAccessPushdown()); + boolean subplanMerge = getBoolean(querySpecificConfig, CompilerProperties.COMPILER_SUBPLAN_MERGE_KEY, + compilerProperties.getSubplanMerge()); + boolean subplanNestedPushdown = getBoolean(querySpecificConfig, + CompilerProperties.COMPILER_SUBPLAN_NESTEDPUSHDOWN_KEY, compilerProperties.getSubplanNestedPushdown()); + boolean minMemoryAllocation = getBoolean(querySpecificConfig, + CompilerProperties.COMPILER_MIN_MEMORY_ALLOCATION_KEY, compilerProperties.getMinMemoryAllocation()); - + int externalScanBufferSize = getExternalScanBufferSize( + (String) querySpecificConfig.get(CompilerProperties.COMPILER_EXTERNALSCANMEMORY_KEY), + compilerProperties.getExternalScanMemorySize(), sourceLoc); - PhysicalOptimizationConfig physOptConf = new PhysicalOptimizationConfig(); physOptConf.setFrameSize(frameSize); physOptConf.setMaxFramesExternalSort(sortFrameLimit); @@@ -82,12 -72,8 +84,13 @@@ physOptConf.setMaxFramesForTextSearch(textSearchFrameLimit); physOptConf.setSortParallel(fullParallelSort); physOptConf.setSortSamples(sortNumSamples); + physOptConf.setIndexOnly(indexOnly); + physOptConf.setSanityCheckEnabled(sanityCheck); + physOptConf.setExternalFieldPushdown(externalFieldPushdown); + physOptConf.setSubplanMerge(subplanMerge); + physOptConf.setSubplanNestedPushdown(subplanNestedPushdown); + physOptConf.setMinMemoryAllocation(minMemoryAllocation); + physOptConf.setExternalScanBufferSize(externalScanBufferSize); - return physOptConf; } diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java index 4288cc4,8bd7a51..5eb8475 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java @@@ -41,35 -41,71 +41,34 @@@ import software.amazon.awssdk.core.exce import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.S3Exception; -public class AwsS3InputStream extends AbstractMultipleInputStream { - - private static final Logger LOGGER = LogManager.getLogger(); +public class AwsS3InputStream extends AbstractExternalInputStream { - private final S3Client s3Client; + // Configuration private final String bucket; - private final int bufferSize; - + private final S3Client s3Client; - - // File fields - private final List<String> filePaths; - private int nextFileIndex = 0; + private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException { - this.filePaths = filePaths; + super(configuration, filePaths); this.s3Client = buildAwsS3Client(configuration); - this.bufferSize = ExternalDataUtils.getOrDefaultBufferSize(configuration); - this.bucket = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME); + this.bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); } @Override - protected boolean advance() throws IOException { - // No files to read for this partition - if (filePaths == null || filePaths.isEmpty()) { - return false; - } - - // Finished reading all the files - if (nextFileIndex >= filePaths.size()) { - return false; - } - - // Close the current stream before going to the next one - if (in != null) { - CleanupUtils.close(in, null); - } - + protected boolean getInputStream() throws IOException { String fileName = filePaths.get(nextFileIndex); GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder(); - GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(fileName).build(); - + GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build(); - // Have a reference to the S3 stream to ensure that if GZipInputStream causes an IOException because of reading // the header, then the S3 stream gets closed in the close method - try { - in = s3Client.getObject(getObjectRequest); - } catch (NoSuchKeyException ex) { - LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(getObjectRequest.key()) + " was not found in bucket " - + getObjectRequest.bucket()); - nextFileIndex++; - return advance(); - } catch (SdkException ex) { - throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage()); + if (!doGetInputStream(getObjectRequest)) { + return false; } -- // Use gzip stream if needed if (StringUtils.endsWithIgnoreCase(fileName, ".gz") || StringUtils.endsWithIgnoreCase(fileName, ".gzip")) { - in = new GZIPInputStream(in, bufferSize); - } - - // Current file ready, point to the next file - nextFileIndex++; - if (notificationHandler != null) { - notificationHandler.notifyNewSource(); + in = new GZIPInputStream(in, ExternalDataConstants.DEFAULT_BUFFER_SIZE); } return true; } diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java index d2499f8,ddbd350..81da0b0 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java @@@ -33,19 -32,19 +32,19 @@@ import org.apache.hyracks.api.exception public class AsterixInputStreamReader extends Reader { private final AsterixInputStream in; - private final byte[] bytes; - private final ByteBuffer byteBuffer; - private final CharBuffer charBuffer; private final CharsetDecoder decoder; - private final byte[] bytes; - private boolean done = false; - private boolean remaining = false; ++ private byte[] bytes; + protected final ByteBuffer byteBuffer; + protected final CharBuffer charBuffer; + protected boolean done = false; + protected boolean remaining = false; - public AsterixInputStreamReader(AsterixInputStream in) { + public AsterixInputStreamReader(AsterixInputStream in, int bufferSize) { this.in = in; + this.decoder = StandardCharsets.UTF_8.newDecoder(); - bytes = new byte[ExternalDataConstants.DEFAULT_BUFFER_SIZE]; - byteBuffer = ByteBuffer.wrap(bytes); - charBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE); + this.bytes = new byte[bufferSize]; + this.byteBuffer = ByteBuffer.wrap(bytes); + this.charBuffer = CharBuffer.allocate(bufferSize); - this.decoder = StandardCharsets.UTF_8.newDecoder(); this.byteBuffer.flip(); } diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/StandardUTF8ToModifiedUTF8DataOutput.java index be0d286,0000000..adeb80e mode 100644,000000..100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/StandardUTF8ToModifiedUTF8DataOutput.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/StandardUTF8ToModifiedUTF8DataOutput.java @@@ -1,158 -1,0 +1,158 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.stream; + +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.hyracks.data.std.util.ByteArrayAccessibleInputStream; + +/** + * Writes modified UTF-8 string format to {@link StandardUTF8ToModifiedUTF8DataOutput#out} + * from standard UTF-8 string format. + */ +public class StandardUTF8ToModifiedUTF8DataOutput implements DataOutput { + private static final byte[] EMPTY = new byte[0]; + private final AStringSerializerDeserializer stringSerDer; + private final ResettableUTF8InputStreamReader reader; + private final char[] inputBuffer; + private char[] appendBuffer; + private DataOutput out; + + public StandardUTF8ToModifiedUTF8DataOutput(AStringSerializerDeserializer stringSerDer) { + this.stringSerDer = stringSerDer; + reader = new ResettableUTF8InputStreamReader(new ByteArrayAccessibleInputStream(EMPTY, 0, 0)); + inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE]; + appendBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE]; + } + + @Override + public void write(int b) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + reader.prepareNextRead(b, off, len); + int numOfChars = reader.read(inputBuffer); + int length = 0; + while (numOfChars > 0) { + appendBuffer = append(inputBuffer, appendBuffer, length, numOfChars); + length += numOfChars; + numOfChars = reader.read(inputBuffer); + } + stringSerDer.serialize(appendBuffer, 0, length, out); + } + + @Override + public void writeBoolean(boolean v) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void writeByte(int v) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void writeShort(int v) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void writeChar(int v) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void writeInt(int v) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void writeLong(long v) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void writeFloat(float v) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void writeDouble(double v) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void writeBytes(String s) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void writeChars(String s) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void writeUTF(String s) throws IOException { + throw new UnsupportedOperationException(); + } + + public void setDataOutput(DataOutput out) { + this.out = out; + } + + private static char[] append(char[] src, char[] dest, int offset, int length) { + char[] destBuf = dest; + if (offset + length > dest.length) { + char[] newDestBuffer = new char[dest.length * 2]; + System.arraycopy(destBuf, 0, newDestBuffer, 0, offset); + destBuf = newDestBuffer; + } + System.arraycopy(src, 0, destBuf, offset, length); + return destBuf; + } + + private static class ResettableUTF8InputStreamReader extends AsterixInputStreamReader { + private final ByteArrayAccessibleInputStream inByte; + + public ResettableUTF8InputStreamReader(ByteArrayAccessibleInputStream inByte) { - super(new BasicInputStream(inByte)); ++ super(new BasicInputStream(inByte), ExternalDataConstants.DEFAULT_BUFFER_SIZE); + this.inByte = inByte; + } + + //Rewind the reader after setting the byte array + public void prepareNextRead(byte[] b, int off, int len) { + inByte.setContent(b, off, len); + done = false; + remaining = false; + byteBuffer.flip(); + charBuffer.flip(); + } + + } +} diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 047fe2a,1911083..a0cf387 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@@ -119,10 -117,10 +119,11 @@@ public class ExternalDataConstants // a string representing the NULL value public static final String KEY_NULL_STR = "null"; public static final String KEY_REDACT_WARNINGS = "redact-warnings"; + public static final String KEY_REQUESTED_FIELDS = "requested-fields"; + public static final String KEY_EXTERNAL_SCAN_BUFFER_SIZE = "external-scan-buffer-size"; /** - * Keys for adapter name + * Keys for adapter name **/ public static final String KEY_ADAPTER_NAME_TWITTER_PUSH = "twitter_push"; public static final String KEY_ADAPTER_NAME_PUSH_TWITTER = "push_twitter"; diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 2d25d08,bc9a038..4beb932 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@@ -18,20 -18,9 +18,21 @@@ */ package org.apache.asterix.external.util; +import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED; +import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ACCOUNT_KEY_FIELD_NAME; +import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ACCOUNT_NAME_FIELD_NAME; +import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.BLOB_ENDPOINT_FIELD_NAME; +import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_ACCOUNT_KEY; +import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_ACCOUNT_NAME; +import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_BLOB_ENDPOINT; +import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_ENDPOINT_SUFFIX; +import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_FIELD_NAME; +import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_SHARED_ACCESS_SIGNATURE; +import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ENDPOINT_SUFFIX_FIELD_NAME; +import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.SHARED_ACCESS_SIGNATURE_FIELD_NAME; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ESCAPE; + import static org.apache.asterix.external.util.ExternalDataConstants.KEY_EXTERNAL_SCAN_BUFFER_SIZE; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START; diff --cc asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java index 29a66b9,715092a..17f91b4 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java @@@ -18,7 -18,8 +18,9 @@@ */ package org.apache.asterix.metadata.declared; + import static org.apache.asterix.external.util.ExternalDataConstants.KEY_EXTERNAL_SCAN_BUFFER_SIZE; + +import java.util.HashMap; import java.util.List; import java.util.Map; @@@ -109,14 -115,16 +112,17 @@@ public class DatasetDataSource extends externalDataset.getItemTypeDataverseName(), itemTypeName).getDatatype(); ExternalDatasetDetails edd = (ExternalDatasetDetails) externalDataset.getDatasetDetails(); + PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig(); + int externalScanBufferSize = physicalOptimizationConfig.getExternalScanBufferSize(); - Map<String, String> properties = edd.getProperties(); + Map<String, String> properties = addProjectionInfo(projectionInfo, edd.getProperties()); + properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE, String.valueOf(externalScanBufferSize)); ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(externalDataset, edd.getAdapter(), properties, (ARecordType) itemType, null, context.getWarningCollector()); - return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory); + return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory, + tupleFilterFactory, outputLimit); case INTERNAL: DataSourceId id = getId(); - String dataverseName = id.getDataverseName(); + DataverseName dataverseName = id.getDataverseName(); String datasetName = id.getDatasourceName(); Index primaryIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, datasetName); diff --cc hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java index 49be31c,b96d657..c6a79a5 --- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java +++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java @@@ -102,12 -102,11 +102,11 @@@ public class HeuristicCompilerFactoryBu serializerDeserializerProvider, hashFunctionFactoryProvider, hashFunctionFamilyProvider, comparatorFactoryProvider, typeTraitProvider, binaryBooleanInspectorFactory, binaryIntegerInspectorFactory, printerProvider, missingWriterFactory, - normalizedKeyComputerFactoryProvider, expressionRuntimeProvider, expressionTypeComputer, - oc, expressionEvalSizeComputer, partialAggregationTypeComputer, - predEvaluatorFactoryProvider, physicalOptimizationConfig.getFrameSize(), - clusterLocations, warningCollector, maxWarnings, physicalOptimizationConfig); - + unnestingPositionWriterFactory, normalizedKeyComputerFactoryProvider, + expressionRuntimeProvider, expressionTypeComputer, oc, expressionEvalSizeComputer, + partialAggregationTypeComputer, predEvaluatorFactoryProvider, + physicalOptimizationConfig.getFrameSize(), clusterLocations, warningCollector, - maxWarnings); - ++ maxWarnings, physicalOptimizationConfig); PlanCompiler pc = new PlanCompiler(context); return pc.compilePlan(plan, jobEventListenerFactory); } diff --cc hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java index c99117b,7630a13..817db5a --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java @@@ -25,12 -26,8 +26,14 @@@ public class AlgebricksConfig public static final String ALGEBRICKS_LOGGER_NAME = "org.apache.hyracks.algebricks"; public static final Logger ALGEBRICKS_LOGGER = LogManager.getLogger(ALGEBRICKS_LOGGER_NAME); - public static final int SORT_SAMPLES = 100; + public static final int SORT_SAMPLES_DEFAULT = 100; + public static final boolean SORT_PARALLEL_DEFAULT = true; + public static final boolean INDEX_ONLY_DEFAULT = true; + public static final boolean SANITYCHECK_DEFAULT = false; + public static final boolean EXTERNAL_FIELD_PUSHDOWN_DEFAULT = false; + public static final boolean SUBPLAN_MERGE_DEFAULT = true; + public static final boolean SUBPLAN_NESTEDPUSHDOWN_DEFAULT = true; + public static final boolean MIN_MEMORY_ALLOCATION_DEFAULT = true; + public static final int EXTERNAL_SCAN_BUFFER_SIZE = - StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE); - public static final boolean SORT_PARALLEL = true; ++ StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.KILOBYTE); } diff --cc hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java index 92f579c,93f5d74..bfff925 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java @@@ -39,14 -39,9 +39,15 @@@ public class PhysicalOptimizationConfi private static final String DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE = "DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE"; private static final String SORT_PARALLEL = "SORT_PARALLEL"; private static final String SORT_SAMPLES = "SORT_SAMPLES"; + private static final String INDEX_ONLY = "INDEX_ONLY"; + private static final String SANITY_CHECK = "SANITY_CHECK"; + private static final String EXTERNAL_FIELD_PUSHDOWN = "EXTERNAL_FIELD_PUSHDOWN"; + private static final String SUBPLAN_MERGE = "SUBPLAN_MERGE"; + private static final String SUBPLAN_NESTEDPUSHDOWN = "SUBPLAN_NESTEDPUSHDOWN"; + private static final String MIN_MEMORY_ALLOCATION = "MIN_MEMORY_ALLOCATION"; + private static final String EXTERNAL_SCAN_BUFFER_SIZE = "EXTERNAL_SCAN_BUFFER_SIZE"; - private Properties properties = new Properties(); + private final Properties properties = new Properties(); public PhysicalOptimizationConfig() { int frameSize = 32768; @@@ -178,54 -173,14 +179,62 @@@ setInt(SORT_SAMPLES, sortSamples); } + public void setIndexOnly(boolean indexOnly) { + setBoolean(INDEX_ONLY, indexOnly); + } + + public boolean isIndexOnly() { + return getBoolean(INDEX_ONLY, AlgebricksConfig.INDEX_ONLY_DEFAULT); + } + + public void setSanityCheckEnabled(boolean sanityCheck) { + setBoolean(SANITY_CHECK, sanityCheck); + } + + public boolean isSanityCheckEnabled() { + return getBoolean(SANITY_CHECK, AlgebricksConfig.SANITYCHECK_DEFAULT); + } + + public boolean isExternalFieldPushdown() { + return getBoolean(EXTERNAL_FIELD_PUSHDOWN, AlgebricksConfig.EXTERNAL_FIELD_PUSHDOWN_DEFAULT); + } + + public void setExternalFieldPushdown(boolean externalFieldPushDown) { + setBoolean(EXTERNAL_FIELD_PUSHDOWN, externalFieldPushDown); + } + + public boolean getSubplanMerge() { + return getBoolean(SUBPLAN_MERGE, AlgebricksConfig.SUBPLAN_MERGE_DEFAULT); + } + + public void setSubplanMerge(boolean value) { + setBoolean(SUBPLAN_MERGE, value); + } + + public boolean getSubplanNestedPushdown() { + return getBoolean(SUBPLAN_NESTEDPUSHDOWN, AlgebricksConfig.SUBPLAN_NESTEDPUSHDOWN_DEFAULT); + } + + public void setSubplanNestedPushdown(boolean value) { + setBoolean(SUBPLAN_NESTEDPUSHDOWN, value); + } + + public boolean getMinMemoryAllocation() { + return getBoolean(MIN_MEMORY_ALLOCATION, AlgebricksConfig.MIN_MEMORY_ALLOCATION_DEFAULT); + } + + public void setMinMemoryAllocation(boolean value) { + setBoolean(MIN_MEMORY_ALLOCATION, value); + } + + public int getExternalScanBufferSize() { + return getInt(EXTERNAL_SCAN_BUFFER_SIZE, AlgebricksConfig.EXTERNAL_SCAN_BUFFER_SIZE); + } + + public void setExternalScanBufferSize(int bufferSize) { + setInt(EXTERNAL_SCAN_BUFFER_SIZE, bufferSize); + } + private void setInt(String property, int value) { properties.setProperty(property, Integer.toString(value)); }
