This is an automated email from the ASF dual-hosted git repository. alsuliman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit c74bdc8d3100a8724b24bffa6ff25d01d8b339c1 Author: Ali Alsuliman <[email protected]> AuthorDate: Sun Mar 21 21:31:28 2021 -0700 [NO ISSUE][EXT] Make read buffer size configurable - user model changes: yes - storage format changes: no - interface changes: no Details: Make read buffer size configurable for external datasets using the compiler property "compiler.externalscanmemory" Change-Id: Ieb32b347b02fb800f13947f53267faa4837f3248 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10624 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- .../optimizer/rules/ConstantFoldingRule.java | 3 +- .../apache/asterix/api/common/APIFramework.java | 11 ++-- .../external_dataset.000.ddl.sqlpp | 66 ++++++++++++++++++++++ .../external_dataset.001.query.sqlpp | 13 +---- .../external_dataset.002.query.sqlpp | 13 +---- .../external_dataset.003.query.sqlpp | 13 +---- .../external_dataset.004.query.sqlpp | 13 +---- .../external_dataset.005.query.sqlpp | 13 +---- .../external_dataset.006.query.sqlpp | 13 +---- .../external_dataset.999.ddl.sqlpp | 12 +--- .../s3/custom-buffer-size/external_dataset.001.adm | 1 + .../s3/custom-buffer-size/external_dataset.002.adm | 1 + .../s3/custom-buffer-size/external_dataset.003.adm | 1 + .../s3/custom-buffer-size/external_dataset.004.adm | 25 ++++++++ .../s3/custom-buffer-size/external_dataset.005.adm | 1 + .../s3/custom-buffer-size/external_dataset.006.adm | 25 ++++++++ .../runtimets/testsuite_external_dataset.xml | 5 ++ .../asterix/common/config/CompilerProperties.java | 12 +++- .../common/config/OptimizationConfUtil.java | 15 +++++ .../input/record/reader/aws/AwsS3InputStream.java | 5 +- .../record/reader/stream/StreamRecordReader.java | 6 +- .../input/stream/AsterixInputStreamReader.java | 16 +++--- .../external/util/ExternalDataConstants.java | 1 + .../asterix/external/util/ExternalDataUtils.java | 6 ++ .../metadata/declared/DatasetDataSource.java | 13 ++++- .../api/HeuristicCompilerFactoryBuilder.java | 2 +- .../algebricks/core/config/AlgebricksConfig.java | 3 + .../algebricks/core/jobgen/impl/JobGenContext.java | 9 ++- .../rewriter/base/PhysicalOptimizationConfig.java | 9 +++ 29 files changed, 231 insertions(+), 95 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java index 47e0373..6f359c5 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java @@ -83,6 +83,7 @@ import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVis import org.apache.hyracks.algebricks.core.config.AlgebricksConfig; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; @@ -146,7 +147,7 @@ public class ConstantFoldingRule implements IAlgebraicRewriteRule { BinaryIntegerInspector.FACTORY, ADMPrinterFactoryProvider.INSTANCE, MissingWriterFactory.INSTANCE, null, new ExpressionRuntimeProvider(new QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())), ExpressionTypeComputer.INSTANCE, null, null, null, null, GlobalConfig.DEFAULT_FRAME_SIZE, null, - NoOpWarningCollector.INSTANCE, 0); + NoOpWarningCollector.INSTANCE, 0, new PhysicalOptimizationConfig()); } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index d573b52..c863080 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -134,11 +134,12 @@ public class APIFramework { CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_WINDOWMEMORY_KEY, CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY, CompilerProperties.COMPILER_PARALLELISM_KEY, CompilerProperties.COMPILER_SORT_PARALLEL_KEY, CompilerProperties.COMPILER_SORT_SAMPLES_KEY, - FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, FuzzyUtils.SIM_FUNCTION_PROP_NAME, - FuzzyUtils.SIM_THRESHOLD_PROP_NAME, StartFeedStatement.WAIT_FOR_COMPLETION, - FeedActivityDetails.FEED_POLICY_NAME, FeedActivityDetails.COLLECT_LOCATIONS, - SqlppQueryRewriter.INLINE_WITH_OPTION, SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, - "hash_merge", "output-record-type", AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION, + CompilerProperties.COMPILER_EXTERNALSCANMEMORY_KEY, FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, + FuzzyUtils.SIM_FUNCTION_PROP_NAME, FuzzyUtils.SIM_THRESHOLD_PROP_NAME, + StartFeedStatement.WAIT_FOR_COMPLETION, FeedActivityDetails.FEED_POLICY_NAME, + FeedActivityDetails.COLLECT_LOCATIONS, SqlppQueryRewriter.INLINE_WITH_OPTION, + SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, "hash_merge", "output-record-type", + AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION, DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION); private final IRewriterFactory rewriterFactory; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.000.ddl.sqlpp new file mode 100644 index 0000000..22a30ca --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.000.ddl.sqlpp @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; +create dataverse test; +use test; + +drop type test if exists; +create type test as open { +}; + +drop dataset test1 if exists; +create external dataset test1(test) using S3 ( +("accessKeyId"="dummyAccessKey"), +("secretAccessKey"="dummySecretKey"), +("region"="us-west-2"), +("serviceEndpoint"="http://localhost:8001"), +("container"="playground"), +("definition"="json-data/reviews/single-line/json"), +("format"="json")); + +drop dataset test2 if exists; +create external dataset test2(test) using S3 ( +("accessKeyId"="dummyAccessKey"), +("secretAccessKey"="dummySecretKey"), +("region"="us-west-2"), +("serviceEndpoint"="http://localhost:8001"), +("container"="playground"), +("definition"="json-data/reviews/multi-lines/json"), +("format"="json")); + +drop dataset test3 if exists; +create external dataset test3(test) using S3 ( +("accessKeyId"="dummyAccessKey"), +("secretAccessKey"="dummySecretKey"), +("region"="us-west-2"), +("serviceEndpoint"="http://localhost:8001"), +("container"="playground"), +("definition"="json-data/reviews/multi-lines-with-arrays/json"), +("format"="json")); + +drop dataset test4 if exists; +create external dataset test4(test) using S3 ( +("accessKeyId"="dummyAccessKey"), +("secretAccessKey"="dummySecretKey"), +("region"="us-west-2"), +("serviceEndpoint"="http://localhost:8001"), +("container"="playground"), +("definition"="json-data/reviews/multi-lines-with-nested-objects/json"), +("format"="json")); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.001.query.sqlpp similarity index 63% copy from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.001.query.sqlpp index 15bb54b..7c1708a 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.001.query.sqlpp @@ -16,15 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.algebricks.core.config; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +use test; +set `compiler.externalscanmemory` "8KB"; +select count(*) `count` from test1; -public class AlgebricksConfig { - - public static final String ALGEBRICKS_LOGGER_NAME = "org.apache.hyracks.algebricks"; - public static final Logger ALGEBRICKS_LOGGER = LogManager.getLogger(ALGEBRICKS_LOGGER_NAME); - public static final int SORT_SAMPLES = 100; - public static final boolean SORT_PARALLEL = true; -} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.002.query.sqlpp similarity index 63% copy from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.002.query.sqlpp index 15bb54b..d122f42 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.002.query.sqlpp @@ -16,15 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.algebricks.core.config; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +use test; +set `compiler.externalscanmemory` "16KB"; +select count(*) `count` from test2; -public class AlgebricksConfig { - - public static final String ALGEBRICKS_LOGGER_NAME = "org.apache.hyracks.algebricks"; - public static final Logger ALGEBRICKS_LOGGER = LogManager.getLogger(ALGEBRICKS_LOGGER_NAME); - public static final int SORT_SAMPLES = 100; - public static final boolean SORT_PARALLEL = true; -} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.003.query.sqlpp similarity index 63% copy from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.003.query.sqlpp index 15bb54b..552f943 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.003.query.sqlpp @@ -16,15 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.algebricks.core.config; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +use test; +set `compiler.externalscanmemory` "32KB"; +select count(*) `count` from test3; -public class AlgebricksConfig { - - public static final String ALGEBRICKS_LOGGER_NAME = "org.apache.hyracks.algebricks"; - public static final Logger ALGEBRICKS_LOGGER = LogManager.getLogger(ALGEBRICKS_LOGGER_NAME); - public static final int SORT_SAMPLES = 100; - public static final boolean SORT_PARALLEL = true; -} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.004.query.sqlpp similarity index 63% copy from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.004.query.sqlpp index 15bb54b..f30af80 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.004.query.sqlpp @@ -16,15 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.algebricks.core.config; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +use test; +set `compiler.externalscanmemory` "8KB"; +select value test3 from test3 order by id; -public class AlgebricksConfig { - - public static final String ALGEBRICKS_LOGGER_NAME = "org.apache.hyracks.algebricks"; - public static final Logger ALGEBRICKS_LOGGER = LogManager.getLogger(ALGEBRICKS_LOGGER_NAME); - public static final int SORT_SAMPLES = 100; - public static final boolean SORT_PARALLEL = true; -} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.005.query.sqlpp similarity index 63% copy from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.005.query.sqlpp index 15bb54b..af6aff0 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.005.query.sqlpp @@ -16,15 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.algebricks.core.config; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +use test; +set `compiler.externalscanmemory` "16KB"; +select count(*) `count` from test4; -public class AlgebricksConfig { - - public static final String ALGEBRICKS_LOGGER_NAME = "org.apache.hyracks.algebricks"; - public static final Logger ALGEBRICKS_LOGGER = LogManager.getLogger(ALGEBRICKS_LOGGER_NAME); - public static final int SORT_SAMPLES = 100; - public static final boolean SORT_PARALLEL = true; -} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.006.query.sqlpp similarity index 63% copy from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.006.query.sqlpp index 15bb54b..854ac80 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.006.query.sqlpp @@ -16,15 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.algebricks.core.config; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +use test; +set `compiler.externalscanmemory` "10KB"; +select value test4 from test4 order by id; -public class AlgebricksConfig { - - public static final String ALGEBRICKS_LOGGER_NAME = "org.apache.hyracks.algebricks"; - public static final Logger ALGEBRICKS_LOGGER = LogManager.getLogger(ALGEBRICKS_LOGGER_NAME); - public static final int SORT_SAMPLES = 100; - public static final boolean SORT_PARALLEL = true; -} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.999.ddl.sqlpp similarity index 63% copy from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.999.ddl.sqlpp index 15bb54b..548e632 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/custom-buffer-size/external_dataset.999.ddl.sqlpp @@ -16,15 +16,5 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.algebricks.core.config; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class AlgebricksConfig { - - public static final String ALGEBRICKS_LOGGER_NAME = "org.apache.hyracks.algebricks"; - public static final Logger ALGEBRICKS_LOGGER = LogManager.getLogger(ALGEBRICKS_LOGGER_NAME); - public static final int SORT_SAMPLES = 100; - public static final boolean SORT_PARALLEL = true; -} +drop dataverse test if exists; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.001.adm new file mode 100644 index 0000000..187a8cb --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.001.adm @@ -0,0 +1 @@ +{ "count": 100 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.002.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.002.adm new file mode 100644 index 0000000..187a8cb --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.002.adm @@ -0,0 +1 @@ +{ "count": 100 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.003.adm new file mode 100644 index 0000000..5db606c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.003.adm @@ -0,0 +1 @@ +{ "count": 25 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.004.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.004.adm new file mode 100644 index 0000000..7660e7e --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.004.adm @@ -0,0 +1,25 @@ +{ "id": 1, "year": null, "quarter": null, "review": "good", "array": [ 1, 2, 3 ] } +{ "id": 1, "year": null, "quarter": null, "review": "good", "array": [ 1, 2, 3 ] } +{ "id": 1, "year": null, "quarter": null, "review": "good", "array": [ 1, 2, 3 ] } +{ "id": 1, "year": null, "quarter": null, "review": "good", "array": [ 1, 2, 3 ] } +{ "id": 1, "year": null, "quarter": null, "review": "good", "array": [ 1, 2, 3 ] } +{ "id": 2, "year": null, "quarter": null, "review": "good", "array": [ 1, [ 1, 2 ], [ 1 ] ] } +{ "id": 2, "year": null, "quarter": null, "review": "good", "array": [ 1, [ 1, 2 ], [ 1 ] ] } +{ "id": 2, "year": null, "quarter": null, "review": "good", "array": [ 1, [ 1, 2 ], [ 1 ] ] } +{ "id": 2, "year": null, "quarter": null, "review": "good", "array": [ 1, [ 1, 2 ], [ 1 ] ] } +{ "id": 2, "year": null, "quarter": null, "review": "good", "array": [ 1, [ 1, 2 ], [ 1 ] ] } +{ "id": 3, "year": 2018, "quarter": null, "review": "good", "array": [ 1, 2, 3 ] } +{ "id": 3, "year": 2018, "quarter": null, "review": "good", "array": [ 1, 2, 3 ] } +{ "id": 3, "year": 2018, "quarter": null, "review": "good", "array": [ 1, 2, 3 ] } +{ "id": 3, "year": 2018, "quarter": null, "review": "good", "array": [ 1, 2, 3 ] } +{ "id": 3, "year": 2018, "quarter": null, "review": "good", "array": [ 1, 2, 3 ] } +{ "id": 4, "year": 2018, "quarter": null, "review": "bad", "array": [ 1, 2, 3 ] } +{ "id": 4, "year": 2018, "quarter": null, "review": "bad", "array": [ 1, 2, 3 ] } +{ "id": 4, "year": 2018, "quarter": null, "review": "bad", "array": [ 1, 2, 3 ] } +{ "id": 4, "year": 2018, "quarter": null, "review": "bad", "array": [ 1, 2, 3 ] } +{ "id": 4, "year": 2018, "quarter": null, "review": "bad", "array": [ 1, 2, 3 ] } +{ "id": 5, "year": 2018, "quarter": 1, "review": "good", "array": [ 1, 2, 3 ] } +{ "id": 5, "year": 2018, "quarter": 1, "review": "good", "array": [ 1, 2, 3 ] } +{ "id": 5, "year": 2018, "quarter": 1, "review": "good", "array": [ 1, 2, 3 ] } +{ "id": 5, "year": 2018, "quarter": 1, "review": "good", "array": [ 1, 2, 3 ] } +{ "id": 5, "year": 2018, "quarter": 1, "review": "good", "array": [ 1, 2, 3 ] } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.005.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.005.adm new file mode 100644 index 0000000..5db606c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.005.adm @@ -0,0 +1 @@ +{ "count": 25 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.006.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.006.adm new file mode 100644 index 0000000..7643986 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/custom-buffer-size/external_dataset.006.adm @@ -0,0 +1,25 @@ +{ "id": 1, "year": null, "quarter": null, "review": "good", "array": [ 1, 2, 3 ], "nested": { "id": 1 } } +{ "id": 1, "year": null, "quarter": null, "review": "good", "array": [ 1, 2, 3 ], "nested": { "id": 1 } } +{ "id": 1, "year": null, "quarter": null, "review": "good", "array": [ 1, 2, 3 ], "nested": { "id": 1 } } +{ "id": 1, "year": null, "quarter": null, "review": "good", "array": [ 1, 2, 3 ], "nested": { "id": 1 } } +{ "id": 1, "year": null, "quarter": null, "review": "good", "array": [ 1, 2, 3 ], "nested": { "id": 1 } } +{ "id": 2, "year": null, "quarter": null, "review": "good", "array": [ 1, [ 1, 2 ], [ 1 ] ], "nested": { "id": 1 }, "nested2": [ { "id": 1 } ] } +{ "id": 2, "year": null, "quarter": null, "review": "good", "array": [ 1, [ 1, 2 ], [ 1 ] ], "nested": { "id": 1 }, "nested2": [ { "id": 1 } ] } +{ "id": 2, "year": null, "quarter": null, "review": "good", "array": [ 1, [ 1, 2 ], [ 1 ] ], "nested": { "id": 1 }, "nested2": [ { "id": 1 } ] } +{ "id": 2, "year": null, "quarter": null, "review": "good", "array": [ 1, [ 1, 2 ], [ 1 ] ], "nested": { "id": 1 }, "nested2": [ { "id": 1 } ] } +{ "id": 2, "year": null, "quarter": null, "review": "good", "array": [ 1, [ 1, 2 ], [ 1 ] ], "nested": { "id": 1 }, "nested2": [ { "id": 1 } ] } +{ "id": 3, "year": 2018, "quarter": null, "review": "good", "array": [ 1, 2, 3, { "nested": { "array": [ { "nested": { "array": [ 1, 2 ] } } ] } } ] } +{ "id": 3, "year": 2018, "quarter": null, "review": "good", "array": [ 1, 2, 3, { "nested": { "array": [ { "nested": { "array": [ 1, 2 ] } } ] } } ] } +{ "id": 3, "year": 2018, "quarter": null, "review": "good", "array": [ 1, 2, 3, { "nested": { "array": [ { "nested": { "array": [ 1, 2 ] } } ] } } ] } +{ "id": 3, "year": 2018, "quarter": null, "review": "good", "array": [ 1, 2, 3, { "nested": { "array": [ { "nested": { "array": [ 1, 2 ] } } ] } } ] } +{ "id": 3, "year": 2018, "quarter": null, "review": "good", "array": [ 1, 2, 3, { "nested": { "array": [ { "nested": { "array": [ 1, 2 ] } } ] } } ] } +{ "id": 4, "year": 2018, "quarter": null, "review": "bad", "array": [ 1, 2, 3, { "nested1": { "id": 1, "nested2": { "id": 2, "nested3": [ { "nested4": null } ] } } } ] } +{ "id": 4, "year": 2018, "quarter": null, "review": "bad", "array": [ 1, 2, 3, { "nested1": { "id": 1, "nested2": { "id": 2, "nested3": [ { "nested4": null } ] } } } ] } +{ "id": 4, "year": 2018, "quarter": null, "review": "bad", "array": [ 1, 2, 3, { "nested1": { "id": 1, "nested2": { "id": 2, "nested3": [ { "nested4": null } ] } } } ] } +{ "id": 4, "year": 2018, "quarter": null, "review": "bad", "array": [ 1, 2, 3, { "nested1": { "id": 1, "nested2": { "id": 2, "nested3": [ { "nested4": null } ] } } } ] } +{ "id": 4, "year": 2018, "quarter": null, "review": "bad", "array": [ 1, 2, 3, { "nested1": { "id": 1, "nested2": { "id": 2, "nested3": [ { "nested4": null } ] } } } ] } +{ "id": 5, "year": 2018, "quarter": 1, "review": "good", "array": [ 1, 2, 3, { "nested": { "array": [ 1, 2 ] } } ] } +{ "id": 5, "year": 2018, "quarter": 1, "review": "good", "array": [ 1, 2, 3, { "nested": { "array": [ 1, 2 ] } } ] } +{ "id": 5, "year": 2018, "quarter": 1, "review": "good", "array": [ 1, 2, 3, { "nested": { "array": [ 1, 2 ] } } ] } +{ "id": 5, "year": 2018, "quarter": 1, "review": "good", "array": [ 1, 2, 3, { "nested": { "array": [ 1, 2 ] } } ] } +{ "id": 5, "year": 2018, "quarter": 1, "review": "good", "array": [ 1, 2, 3, { "nested": { "array": [ 1, 2 ] } } ] } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml index 9cded35..096e511 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml @@ -19,6 +19,11 @@ !--> <test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp"> <test-group name="external-dataset"> + <test-case FilePath="external-dataset/aws/s3/"> + <compilation-unit name="custom-buffer-size"> + <output-dir compare="Text">custom-buffer-size</output-dir> + </compilation-unit> + </test-case> <test-case FilePath="external-dataset"> <compilation-unit name="aws/s3/json/json"> <output-dir compare="Text">aws/s3/json/json</output-dir> diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java index 8860495..652f390 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java @@ -56,6 +56,10 @@ public class CompilerProperties extends AbstractProperties { LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(32L, MEGABYTE), "The memory budget (in bytes) for an inverted-index-search operator instance in a partition"), + COMPILER_EXTERNALSCANMEMORY( + INTEGER_BYTE_UNIT, + StorageUtil.getIntSizeInBytes(4, KILOBYTE), + "The memory budget (in bytes) for an external scan operator instance in a partition"), COMPILER_FRAMESIZE( INTEGER_BYTE_UNIT, StorageUtil.getIntSizeInBytes(32, KILOBYTE), @@ -110,7 +114,7 @@ public class CompilerProperties extends AbstractProperties { @Override public boolean hidden() { - return this == COMPILER_STRINGOFFSET; + return this == COMPILER_STRINGOFFSET || this == COMPILER_EXTERNALSCANMEMORY; } } @@ -130,6 +134,8 @@ public class CompilerProperties extends AbstractProperties { public static final String COMPILER_SORT_SAMPLES_KEY = Option.COMPILER_SORT_SAMPLES.ini(); + public static final String COMPILER_EXTERNALSCANMEMORY_KEY = Option.COMPILER_EXTERNALSCANMEMORY.ini(); + public static final int COMPILER_PARALLELISM_AS_STORAGE = 0; public CompilerProperties(PropertiesAccessor accessor) { @@ -176,4 +182,8 @@ public class CompilerProperties extends AbstractProperties { public int getSortSamples() { return accessor.getInt(Option.COMPILER_SORT_SAMPLES); } + + public int getExternalScanMemorySize() { + return accessor.getInt(Option.COMPILER_EXTERNALSCANMEMORY); + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java index 23fdcac..22fb13b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java @@ -59,6 +59,9 @@ public class OptimizationConfUtil { int textSearchFrameLimit = getTextSearchNumFrames(compilerProperties, querySpecificConfig, sourceLoc); int sortNumSamples = getSortSamples(compilerProperties, querySpecificConfig, sourceLoc); boolean fullParallelSort = getSortParallel(compilerProperties, querySpecificConfig); + int externalScanBufferSize = getExternalScanBufferSize( + (String) querySpecificConfig.get(CompilerProperties.COMPILER_EXTERNALSCANMEMORY_KEY), + compilerProperties.getExternalScanMemorySize(), sourceLoc); PhysicalOptimizationConfig physOptConf = new PhysicalOptimizationConfig(); physOptConf.setFrameSize(frameSize); @@ -69,10 +72,22 @@ public class OptimizationConfUtil { physOptConf.setMaxFramesForTextSearch(textSearchFrameLimit); physOptConf.setSortParallel(fullParallelSort); physOptConf.setSortSamples(sortNumSamples); + physOptConf.setExternalScanBufferSize(externalScanBufferSize); return physOptConf; } + private static int getExternalScanBufferSize(String externalScanMemorySizeParameter, + int compilerExternalScanMemorySize, SourceLocation sourceLoc) throws AsterixException { + IOptionType<Integer> intByteParser = OptionTypes.INTEGER_BYTE_UNIT; + try { + return externalScanMemorySizeParameter != null ? intByteParser.parse(externalScanMemorySizeParameter) + : compilerExternalScanMemorySize; + } catch (IllegalArgumentException e) { + throw AsterixException.create(ErrorCode.COMPILATION_ERROR, sourceLoc, e.getMessage()); + } + } + public static int getSortNumFrames(CompilerProperties compilerProperties, Map<String, Object> querySpecificConfig, SourceLocation sourceLoc) throws AlgebricksException { return getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY, diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java index 9e10e6a..f558820 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java @@ -29,7 +29,6 @@ import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.input.stream.AbstractMultipleInputStream; -import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.CleanupUtils; @@ -48,6 +47,7 @@ public class AwsS3InputStream extends AbstractMultipleInputStream { // Configuration private final Map<String, String> configuration; + private final int bufferSize; private final S3Client s3Client; @@ -59,6 +59,7 @@ public class AwsS3InputStream extends AbstractMultipleInputStream { this.configuration = configuration; this.filePaths = filePaths; this.s3Client = buildAwsS3Client(configuration); + this.bufferSize = ExternalDataUtils.getOrDefaultBufferSize(configuration); } @Override @@ -101,7 +102,7 @@ public class AwsS3InputStream extends AbstractMultipleInputStream { // Use gzip stream if needed String filename = filePaths.get(nextFileIndex).toLowerCase(); if (filename.endsWith(".gz") || filename.endsWith(".gzip")) { - in = new GZIPInputStream(s3Client.getObject(getObjectRequest), ExternalDataConstants.DEFAULT_BUFFER_SIZE); + in = new GZIPInputStream(s3Client.getObject(getObjectRequest), bufferSize); } // Current file ready, point to the next file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java index cb16de5..1e003a1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java @@ -33,7 +33,6 @@ import org.apache.asterix.external.api.IStreamNotificationHandler; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.input.record.CharArrayRecord; import org.apache.asterix.external.input.stream.AsterixInputStreamReader; -import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.FeedLogManager; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -51,9 +50,10 @@ public abstract class StreamRecordReader implements IRecordReader<char[]>, IStre private Supplier<String> previousDataSourceName = EMPTY_STRING; public void configure(AsterixInputStream inputStream, Map<String, String> config) { - this.reader = new AsterixInputStreamReader(inputStream); + int bufferSize = ExternalDataUtils.getOrDefaultBufferSize(config); + this.reader = new AsterixInputStreamReader(inputStream, bufferSize); record = new CharArrayRecord(); - inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE]; + inputBuffer = new char[bufferSize]; if (!ExternalDataUtils.isTrue(config, KEY_REDACT_WARNINGS)) { this.dataSourceName = reader::getStreamName; this.previousDataSourceName = reader::getPreviousStreamName; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java index 4e963e4..ddbd350 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java @@ -27,21 +27,23 @@ import java.nio.charset.StandardCharsets; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; -import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.FeedLogManager; import org.apache.hyracks.api.exceptions.HyracksDataException; public class AsterixInputStreamReader extends Reader { - private AsterixInputStream in; - private byte[] bytes = new byte[ExternalDataConstants.DEFAULT_BUFFER_SIZE]; - private ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - private CharBuffer charBuffer = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE); - private CharsetDecoder decoder; + private final AsterixInputStream in; + private final byte[] bytes; + private final ByteBuffer byteBuffer; + private final CharBuffer charBuffer; + private final CharsetDecoder decoder; private boolean done = false; private boolean remaining = false; - public AsterixInputStreamReader(AsterixInputStream in) { + public AsterixInputStreamReader(AsterixInputStream in, int bufferSize) { this.in = in; + this.bytes = new byte[bufferSize]; + this.byteBuffer = ByteBuffer.wrap(bytes); + this.charBuffer = CharBuffer.allocate(bufferSize); this.decoder = StandardCharsets.UTF_8.newDecoder(); this.byteBuffer.flip(); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 53306c5..1911083 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -117,6 +117,7 @@ public class ExternalDataConstants { // a string representing the NULL value public static final String KEY_NULL_STR = "null"; public static final String KEY_REDACT_WARNINGS = "redact-warnings"; + public static final String KEY_EXTERNAL_SCAN_BUFFER_SIZE = "external-scan-buffer-size"; /** * Keys for adapter name diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index c0a7a4d..bc9a038 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -20,6 +20,7 @@ package org.apache.asterix.external.util; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ESCAPE; +import static org.apache.asterix.external.util.ExternalDataConstants.KEY_EXTERNAL_SCAN_BUFFER_SIZE; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START; @@ -90,6 +91,11 @@ public class ExternalDataUtils { private ExternalDataUtils() { } + public static int getOrDefaultBufferSize(Map<String, String> configuration) { + String bufferSize = configuration.get(KEY_EXTERNAL_SCAN_BUFFER_SIZE); + return bufferSize != null ? Integer.parseInt(bufferSize) : ExternalDataConstants.DEFAULT_BUFFER_SIZE; + } + // Get a delimiter from the given configuration public static char validateGetDelimiter(Map<String, String> configuration) throws HyracksDataException { return validateCharOrDefault(configuration, KEY_DELIMITER, ExternalDataConstants.DEFAULT_DELIMITER.charAt(0)); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java index 71d389e..715092a 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java @@ -18,7 +18,10 @@ */ package org.apache.asterix.metadata.declared; +import static org.apache.asterix.external.util.ExternalDataConstants.KEY_EXTERNAL_SCAN_BUFFER_SIZE; + import java.util.List; +import java.util.Map; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.exceptions.CompilationException; @@ -42,6 +45,7 @@ import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; @@ -111,9 +115,12 @@ public class DatasetDataSource extends DataSource { externalDataset.getItemTypeDataverseName(), itemTypeName).getDatatype(); ExternalDatasetDetails edd = (ExternalDatasetDetails) externalDataset.getDatasetDetails(); - ITypedAdapterFactory adapterFactory = - metadataProvider.getConfiguredAdapterFactory(externalDataset, edd.getAdapter(), - edd.getProperties(), (ARecordType) itemType, null, context.getWarningCollector()); + PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig(); + int externalScanBufferSize = physicalOptimizationConfig.getExternalScanBufferSize(); + Map<String, String> properties = edd.getProperties(); + properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE, String.valueOf(externalScanBufferSize)); + ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(externalDataset, + edd.getAdapter(), properties, (ARecordType) itemType, null, context.getWarningCollector()); return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory); case INTERNAL: DataSourceId id = getId(); diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java index 0fd107f..b96d657 100644 --- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java +++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java @@ -105,7 +105,7 @@ public class HeuristicCompilerFactoryBuilder extends AbstractCompilerFactoryBuil normalizedKeyComputerFactoryProvider, expressionRuntimeProvider, expressionTypeComputer, oc, expressionEvalSizeComputer, partialAggregationTypeComputer, predEvaluatorFactoryProvider, physicalOptimizationConfig.getFrameSize(), - clusterLocations, warningCollector, maxWarnings); + clusterLocations, warningCollector, maxWarnings, physicalOptimizationConfig); PlanCompiler pc = new PlanCompiler(context); return pc.compilePlan(plan, jobEventListenerFactory); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java index 15bb54b..7630a13 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.algebricks.core.config; +import org.apache.hyracks.util.StorageUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -26,5 +27,7 @@ public class AlgebricksConfig { public static final String ALGEBRICKS_LOGGER_NAME = "org.apache.hyracks.algebricks"; public static final Logger ALGEBRICKS_LOGGER = LogManager.getLogger(ALGEBRICKS_LOGGER_NAME); public static final int SORT_SAMPLES = 100; + public static final int EXTERNAL_SCAN_BUFFER_SIZE = + StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE); public static final boolean SORT_PARALLEL = true; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java index 4bc5689..3c03b18 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java @@ -34,6 +34,7 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvir import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory; import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider; @@ -68,6 +69,7 @@ public class JobGenContext { private final IPartialAggregationTypeComputer partialAggregationTypeComputer; private final IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider; private final int frameSize; + private final PhysicalOptimizationConfig physicalOptimizationConfig; private AlgebricksAbsolutePartitionConstraint clusterLocations; private int varCounter; private final ITypingContext typingContext; @@ -88,7 +90,7 @@ public class JobGenContext { IPartialAggregationTypeComputer partialAggregationTypeComputer, IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider, int frameSize, AlgebricksAbsolutePartitionConstraint clusterLocations, IWarningCollector warningCollector, - long maxWarnings) { + long maxWarnings, PhysicalOptimizationConfig physicalOptimizationConfig) { this.outerFlowSchema = outerFlowSchema; this.metadataProvider = metadataProvider; this.appContext = appContext; @@ -110,6 +112,7 @@ public class JobGenContext { this.partialAggregationTypeComputer = partialAggregationTypeComputer; this.predEvaluatorFactoryProvider = predEvaluatorFactoryProvider; this.frameSize = frameSize; + this.physicalOptimizationConfig = physicalOptimizationConfig; this.varCounter = 0; this.warningCollector = warningCollector; this.maxWarnings = maxWarnings; @@ -220,4 +223,8 @@ public class JobGenContext { public long getMaxWarnings() { return maxWarnings; } + + public PhysicalOptimizationConfig getPhysicalOptimizationConfig() { + return physicalOptimizationConfig; + } } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java index 598497c..93f5d74 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java @@ -39,6 +39,7 @@ public class PhysicalOptimizationConfig { private static final String DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE = "DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE"; private static final String SORT_PARALLEL = "SORT_PARALLEL"; private static final String SORT_SAMPLES = "SORT_SAMPLES"; + private static final String EXTERNAL_SCAN_BUFFER_SIZE = "EXTERNAL_SCAN_BUFFER_SIZE"; private Properties properties = new Properties(); @@ -172,6 +173,14 @@ public class PhysicalOptimizationConfig { setInt(SORT_SAMPLES, sortSamples); } + public int getExternalScanBufferSize() { + return getInt(EXTERNAL_SCAN_BUFFER_SIZE, AlgebricksConfig.EXTERNAL_SCAN_BUFFER_SIZE); + } + + public void setExternalScanBufferSize(int bufferSize) { + setInt(EXTERNAL_SCAN_BUFFER_SIZE, bufferSize); + } + private void setInt(String property, int value) { properties.setProperty(property, Integer.toString(value)); }
