This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 1c16d60fef94bfd82790d9c1d2ba82e25def9a52 Author: harshal <[email protected]> AuthorDate: Thu Aug 24 22:23:58 2023 +0530 [HUDI-6735] Adding support for snapshotLoadQuerySplitter for incremental sources. (#9501) Snapshot load scan of historical table (having majority of data in archived timeline) causes large batch processing. Adding interface to support breaking snapshotload query into batches which can have commitId as checkpoint . --------- Co-authored-by: Sagar Sumit <[email protected]> --- .../hudi/utilities/sources/HoodieIncrSource.java | 17 ++++- .../sources/SnapshotLoadQuerySplitter.java | 78 ++++++++++++++++++++++ .../hudi/utilities/sources/helpers/QueryInfo.java | 12 ++++ .../utilities/sources/TestHoodieIncrSource.java | 22 +++++- .../helpers/TestSnapshotQuerySplitterImpl.java | 51 ++++++++++++++ 5 files changed, 174 insertions(+), 6 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 0141f5ad458..fa316cf806f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.config.HoodieIncrSourceConfig; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -50,12 +51,14 @@ import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger; +import static org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode; public class HoodieIncrSource extends RowSource { private static final Logger LOG = LoggerFactory.getLogger(HoodieIncrSource.class); + private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter; public static class Config { @@ -128,6 +131,10 @@ public class HoodieIncrSource extends RowSource { public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); + + this.snapshotLoadQuerySplitter = Option.ofNullable(props.getString(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, null)) + .map(className -> (SnapshotLoadQuerySplitter) ReflectionUtils.loadClass(className, + new Class<?>[] {TypedProperties.class}, props)); } @Override @@ -184,9 +191,13 @@ public class HoodieIncrSource extends RowSource { .load(srcPath); } else { // if checkpoint is missing from source table, and if strategy is set to READ_UPTO_LATEST_COMMIT, we have to issue snapshot query - source = sparkSession.read().format("org.apache.hudi") - .option(QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()) - .load(srcPath) + Dataset<Row> snapshot = sparkSession.read().format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()) + .load(srcPath); + if (snapshotLoadQuerySplitter.isPresent()) { + queryInfo = snapshotLoadQuerySplitter.get().getNextCheckpoint(snapshot, queryInfo); + } + source = snapshot // add filtering so that only interested records are returned. .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, queryInfo.getStartInstant())) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java new file mode 100644 index 00000000000..6a13607b1d5 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.sources.helpers.QueryInfo; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +/** + * Abstract splitter responsible for managing the snapshot load query operations. + */ +public abstract class SnapshotLoadQuerySplitter { + + /** + * Configuration properties for the splitter. + */ + protected final TypedProperties properties; + + /** + * Configurations for the SnapshotLoadQuerySplitter. + */ + public static class Config { + /** + * Property for the snapshot load query splitter class name. + */ + public static final String SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME = "hoodie.deltastreamer.snapshotload.query.splitter.class.name"; + } + + /** + * Constructor initializing the properties. + * + * @param properties Configuration properties for the splitter. + */ + public SnapshotLoadQuerySplitter(TypedProperties properties) { + this.properties = properties; + } + + /** + * Abstract method to retrieve the next checkpoint. + * + * @param df The dataset to process. + * @param beginCheckpointStr The starting checkpoint string. + * @return The next checkpoint as an Option. + */ + public abstract Option<String> getNextCheckpoint(Dataset<Row> df, String beginCheckpointStr); + + /** + * Retrieves the next checkpoint based on query information. + * + * @param df The dataset to process. + * @param queryInfo The query information object. + * @return Updated query information with the next checkpoint, in case of empty checkpoint, + * returning endPoint same as queryInfo.getEndInstant(). + */ + public QueryInfo getNextCheckpoint(Dataset<Row> df, QueryInfo queryInfo) { + return getNextCheckpoint(df, queryInfo.getStartInstant()) + .map(checkpoint -> queryInfo.withUpdatedEndInstant(checkpoint)) + .orElse(queryInfo); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java index 4e4ee275829..a510daf4de3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java @@ -97,6 +97,18 @@ public class QueryInfo { return orderByColumns; } + public QueryInfo withUpdatedEndInstant(String newEndInstant) { + return new QueryInfo( + this.queryType, + this.previousInstant, + this.startInstant, + newEndInstant, + this.orderColumn, + this.keyColumn, + this.limitColumn + ); + } + @Override public String toString() { return ("Query information for Incremental Source " diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index 6502b4a60b1..301b6472de1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -44,6 +44,7 @@ import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.utilities.sources.helpers.TestSnapshotQuerySplitterImpl; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -287,6 +288,15 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness { assertTrue(compactionInstant.get().getTimestamp().compareTo(latestCommitTimestamp) < 0); } + // test SnapshotLoadQuerySpliiter to split snapshot query . + // Reads only first commit + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, + Option.empty(), + 100, + dataBatches.get(0).getKey(), + Option.of(TestSnapshotQuerySplitterImpl.class.getName())); + writeClient.close(); + // The pending tables services should not block the incremental pulls // Reads everything up to latest readAndAssert( @@ -315,15 +325,16 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness { Option.of(dataBatches.get(6).getKey()), 0, dataBatches.get(6).getKey()); - - writeClient.close(); } - private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, int expectedCount, String expectedCheckpoint) { + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, int expectedCount, + String expectedCheckpoint, Option<String> snapshotCheckPointImplClassOpt) { Properties properties = new Properties(); properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath()); properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name()); + snapshotCheckPointImplClassOpt.map(className -> + properties.setProperty(SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, className)); TypedProperties typedProperties = new TypedProperties(properties); HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc(), spark(), new DummySchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA)); @@ -338,6 +349,11 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness { assertEquals(expectedCheckpoint, batchCheckPoint.getRight()); } + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, + int expectedCount, String expectedCheckpoint) { + readAndAssert(missingCheckpointStrategy, checkpointToPull, expectedCount, expectedCheckpoint, Option.empty()); + } + private Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient writeClient, WriteOperationType writeOperationType, List<HoodieRecord> insertRecords, diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestSnapshotQuerySplitterImpl.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestSnapshotQuerySplitterImpl.java new file mode 100644 index 00000000000..4ba79e8978a --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestSnapshotQuerySplitterImpl.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import java.util.List; + +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.lit; + +public class TestSnapshotQuerySplitterImpl extends SnapshotLoadQuerySplitter { + + private static final String COMMIT_TIME_METADATA_FIELD = HoodieRecord.COMMIT_TIME_METADATA_FIELD; + + /** + * Constructor initializing the properties. + * + * @param properties Configuration properties for the splitter. + */ + public TestSnapshotQuerySplitterImpl(TypedProperties properties) { + super(properties); + } + + @Override + public Option<String> getNextCheckpoint(Dataset<Row> df, String beginCheckpointStr) { + List<Row> row = df.filter(col(COMMIT_TIME_METADATA_FIELD).gt(lit(beginCheckpointStr))) + .orderBy(col(COMMIT_TIME_METADATA_FIELD)).limit(1).collectAsList(); + return Option.ofNullable(row.size() > 0 ? row.get(0).getAs(COMMIT_TIME_METADATA_FIELD) : null); + } +}
