This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1949a90ff24 [HUDI-6999] Adding row writer support to HoodieStreamer
(#9913)
1949a90ff24 is described below
commit 1949a90ff24d3d87cb4a24a30aefac6dcd672e84
Author: Jon Vexler <[email protected]>
AuthorDate: Mon Nov 6 11:10:24 2023 -0500
[HUDI-6999] Adding row writer support to HoodieStreamer (#9913)
- Fixing row writer with deltastreamer and refactoring StreamSync to
accomodate for row and avro formats
---------
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: sivabalan <[email protected]>
---
.../testsuite/HoodieDeltaStreamerWrapper.java | 4 +-
...eamerDatasetBulkInsertCommitActionExecutor.java | 66 +++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 23 +-
.../hudi/utilities/streamer/HoodieStreamer.java | 10 +-
.../utilities/streamer/HoodieStreamerUtils.java | 151 +++++++
.../apache/hudi/utilities/streamer/StreamSync.java | 444 +++++++++++----------
.../deltastreamer/HoodieDeltaStreamerTestBase.java | 4 +-
.../deltastreamer/TestHoodieDeltaStreamer.java | 164 +++++++-
.../TestHoodieDeltaStreamerDAGExecution.java | 2 +-
9 files changed, 644 insertions(+), 224 deletions(-)
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index 85987e36b5c..d8d5dad329d 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -25,6 +25,7 @@ import
org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.streamer.StreamSync;
import org.apache.spark.api.java.JavaRDD;
@@ -80,7 +81,8 @@ public class HoodieDeltaStreamerWrapper extends
HoodieDeltaStreamer {
StreamSync service = getDeltaSync();
service.refreshTimeline();
String instantTime = InProcessTimeGenerator.createNewInstantTime();
- return service.readFromSource(instantTime);
+ InputBatch inputBatch = service.readFromSource(instantTime).getLeft();
+ return Pair.of(inputBatch.getSchemaProvider(),
Pair.of(inputBatch.getCheckpointForNextBatch(), (JavaRDD<HoodieRecord>)
inputBatch.getBatch().get()));
}
public StreamSync getDeltaSync() {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
new file mode 100644
index 00000000000..5593a95ca39
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commit;
+
+import org.apache.hudi.HoodieDatasetBulkInsertHelper;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+/**
+ * Executor to be used by stream sync. Directly invokes
HoodieDatasetBulkInsertHelper.bulkInsert so that WriteStatus is
+ * properly returned. Additionally, we do not want to commit the write in this
code because it happens in StreamSync.
+ */
+public class HoodieStreamerDatasetBulkInsertCommitActionExecutor extends
BaseDatasetBulkInsertCommitActionExecutor {
+
+ public HoodieStreamerDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig
config, SparkRDDWriteClient writeClient, String instantTime) {
+ super(config, writeClient, instantTime);
+ }
+
+ @Override
+ protected void preExecute() {
+ // no op
+ }
+
+ @Override
+ protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>>
result) {
+ // no op
+ }
+
+ @Override
+ protected Option<HoodieData<WriteStatus>> doExecute(Dataset<Row> records,
boolean arePartitionRecordsSorted) {
+ table.getActiveTimeline().transitionRequestedToInflight(new
HoodieInstant(HoodieInstant.State.REQUESTED, getCommitActionType(),
instantTime), Option.empty());
+ return Option.of(HoodieDatasetBulkInsertHelper
+ .bulkInsert(records, instantTime, table, writeConfig,
arePartitionRecordsSorted, false));
+ }
+
+ @Override
+ public WriteOperationType getWriteOperationType() {
+ return WriteOperationType.BULK_INSERT;
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index a63f861480d..93b8c2fcf91 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -149,6 +149,19 @@ object HoodieSparkSqlWriter {
Metrics.shutdownAllMetrics()
}
+ def getBulkInsertRowConfig(writerSchema: Schema, hoodieConfig: HoodieConfig,
+ basePath: String, tblName: String):
HoodieWriteConfig = {
+ val writerSchemaStr = writerSchema.toString
+
+ // Make opts mutable since it could be modified by
tryOverrideParquetWriteLegacyFormatProperty
+ val opts = mutable.Map() ++ hoodieConfig.getProps.toMap ++
+ Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key -> writerSchemaStr)
+
+ // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
+ tryOverrideParquetWriteLegacyFormatProperty(opts,
convertAvroSchemaToStructType(writerSchema))
+ DataSourceUtils.createHoodieConfig(writerSchemaStr, basePath, tblName,
opts)
+ }
+
}
class HoodieSparkSqlWriterInternal {
@@ -940,15 +953,7 @@ class HoodieSparkSqlWriterInternal {
val sqlContext =
writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext
val jsc =
writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getJavaSparkContext
- val writerSchemaStr = writerSchema.toString
-
- // Make opts mutable since it could be modified by
tryOverrideParquetWriteLegacyFormatProperty
- val opts = mutable.Map() ++ hoodieConfig.getProps.toMap ++
- Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key -> writerSchemaStr)
-
- // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
- tryOverrideParquetWriteLegacyFormatProperty(opts,
convertAvroSchemaToStructType(writerSchema))
- val writeConfig = DataSourceUtils.createHoodieConfig(writerSchemaStr,
basePath.toString, tblName, opts)
+ val writeConfig =
HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema, hoodieConfig,
basePath.toString, tblName)
val overwriteOperationType =
Option(hoodieConfig.getString(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE))
.map(WriteOperationType.fromValue)
.orNull
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 95534c5533f..5604a6240c7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -93,6 +93,8 @@ import java.util.concurrent.Executors;
import static java.lang.String.format;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.utilities.UtilHelpers.buildProperties;
+import static org.apache.hudi.utilities.UtilHelpers.readConfig;
/**
* An Utility which can incrementally take the output from {@link
HiveIncrementalPuller} and apply it to the target
@@ -170,7 +172,7 @@ public class HoodieStreamer implements Serializable {
} else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES))
{
hoodieConfig.setAll(UtilHelpers.getConfig(cfg.configs).getProps());
} else {
- hoodieConfig.setAll(UtilHelpers.readConfig(hadoopConf, new
Path(cfg.propsFilePath), cfg.configs).getProps());
+ hoodieConfig.setAll(readConfig(hadoopConf, new Path(cfg.propsFilePath),
cfg.configs).getProps());
}
// set any configs that Deltastreamer has to override explicitly
@@ -429,6 +431,12 @@ public class HoodieStreamer implements Serializable {
&&
HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
}
+ public static TypedProperties getProps(FileSystem fs, Config cfg) {
+ return cfg.propsFilePath.isEmpty()
+ ? buildProperties(cfg.configs)
+ : readConfig(fs.getConf(), new Path(cfg.propsFilePath),
cfg.configs).getProps();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
new file mode 100644
index 00000000000..ad1de230f41
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.SparkAdapterSupport$;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.SerializableSchema;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.keygen.BuiltinKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.SparkKeyGenUtils;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.avro.HoodieAvroDeserializer;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS;
+
+
+/**
+ * Util class for HoodieStreamer.
+ */
+public class HoodieStreamerUtils {
+
+ /**
+ * Generates HoodieRecords for the avro data read from source.
+ * Takes care of dropping columns, precombine, auto key generation.
+ * Both AVRO and SPARK record types are supported.
+ */
+ static JavaRDD<HoodieRecord> createHoodieRecords(HoodieStreamer.Config cfg,
TypedProperties props, Option<JavaRDD<GenericRecord>> avroRDDOptional,
+ SchemaProvider schemaProvider,
HoodieRecord.HoodieRecordType recordType, boolean autoGenerateRecordKeys,
+ String instantTime) {
+ boolean shouldCombine = cfg.filterDupes ||
cfg.operation.equals(WriteOperationType.UPSERT);
+ Set<String> partitionColumns = getPartitionColumns(props);
+ JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
+
+ JavaRDD<HoodieRecord> records;
+ SerializableSchema avroSchema = new
SerializableSchema(schemaProvider.getTargetSchema());
+ SerializableSchema processedAvroSchema = new
SerializableSchema(isDropPartitionColumns(props) ?
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
+ if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
+ records = avroRDD.mapPartitions(
+ (FlatMapFunction<Iterator<GenericRecord>, HoodieRecord>)
genericRecordIterator -> {
+ if (autoGenerateRecordKeys) {
+
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(TaskContext.getPartitionId()));
+
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
+ }
+ BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
+ List<HoodieRecord> avroRecords = new ArrayList<>();
+ while (genericRecordIterator.hasNext()) {
+ GenericRecord genRec = genericRecordIterator.next();
+ HoodieKey hoodieKey = new
HoodieKey(builtinKeyGenerator.getRecordKey(genRec),
builtinKeyGenerator.getPartitionPath(genRec));
+ GenericRecord gr = isDropPartitionColumns(props) ?
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
+ HoodieRecordPayload payload = shouldCombine ?
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
+ (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
cfg.sourceOrderingField, false, props.getBoolean(
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
+ : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
+ avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload));
+ }
+ return avroRecords.iterator();
+ });
+ } else if (recordType == HoodieRecord.HoodieRecordType.SPARK) {
+ // TODO we should remove it if we can read InternalRow from source.
+ records = avroRDD.mapPartitions(itr -> {
+ if (autoGenerateRecordKeys) {
+ props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(TaskContext.getPartitionId()));
+ props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG,
instantTime);
+ }
+ BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
+ StructType baseStructType =
AvroConversionUtils.convertAvroSchemaToStructType(processedAvroSchema.get());
+ StructType targetStructType = isDropPartitionColumns(props) ?
AvroConversionUtils
+
.convertAvroSchemaToStructType(HoodieAvroUtils.removeFields(processedAvroSchema.get(),
partitionColumns)) : baseStructType;
+ HoodieAvroDeserializer deserializer =
SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(processedAvroSchema.get(),
baseStructType);
+
+ return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), rec
-> {
+ InternalRow row = (InternalRow) deserializer.deserialize(rec).get();
+ String recordKey = builtinKeyGenerator.getRecordKey(row,
baseStructType).toString();
+ String partitionPath = builtinKeyGenerator.getPartitionPath(row,
baseStructType).toString();
+ return new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath),
+ HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType,
targetStructType).apply(row), targetStructType, false);
+ });
+ });
+ } else {
+ throw new UnsupportedOperationException(recordType.name());
+ }
+ return records;
+ }
+
+ /**
+ * Set based on hoodie.datasource.write.drop.partition.columns config.
+ * When set to true, will not write the partition columns into the table.
+ */
+ static Boolean isDropPartitionColumns(TypedProperties props) {
+ return props.getBoolean(DROP_PARTITION_COLUMNS.key(),
DROP_PARTITION_COLUMNS.defaultValue());
+ }
+
+ /**
+ * Get the partition columns as a set of strings.
+ *
+ * @param props TypedProperties
+ * @return Set of partition columns.
+ */
+ static Set<String> getPartitionColumns(TypedProperties props) {
+ String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
+ return
Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
+ }
+
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 936eef67a9b..82e058b7c0b 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -25,7 +25,6 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieConversionUtils;
import org.apache.hudi.HoodieSparkSqlWriter;
import org.apache.hudi.HoodieSparkUtils;
-import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
@@ -33,18 +32,16 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.commit.BaseDatasetBulkInsertCommitActionExecutor;
+import
org.apache.hudi.commit.HoodieStreamerDatasetBulkInsertCommitActionExecutor;
+import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
-import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -58,8 +55,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -73,9 +68,7 @@ import org.apache.hudi.exception.HoodieMetaSyncException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;
@@ -97,6 +90,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaSet;
import org.apache.hudi.utilities.schema.SimpleSchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.streamer.HoodieStreamer.Config;
import org.apache.hudi.utilities.transform.Transformer;
@@ -107,18 +101,12 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.avro.HoodieAvroDeserializer;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,20 +118,17 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
-import java.util.stream.Collectors;
import scala.Tuple2;
import scala.collection.JavaConversions;
import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
-import static
org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS;
import static
org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE;
import static
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
@@ -265,6 +250,8 @@ public class StreamSync implements Serializable, Closeable {
private final boolean autoGenerateRecordKeys;
+ private final boolean useRowWriter;
+
@Deprecated
public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider,
TypedProperties props, JavaSparkContext jssc, FileSystem
fs, Configuration conf,
@@ -297,13 +284,18 @@ public class StreamSync implements Serializable,
Closeable {
this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg,
sparkSession, props, hoodieSparkContext, fs);
this.errorWriteFailureStrategy =
ErrorTableUtils.getErrorWriteFailureStrategy(props);
}
- this.formatAdapter = new SourceFormatAdapter(
- UtilHelpers.createSource(cfg.sourceClassName, props,
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics),
- this.errorTableWriter, Option.of(props));
+ Source source = UtilHelpers.createSource(cfg.sourceClassName, props,
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics);
+ this.formatAdapter = new SourceFormatAdapter(source,
this.errorTableWriter, Option.of(props));
this.transformer =
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames),
Option.ofNullable(schemaProvider).map(SchemaProvider::getSourceSchema),
this.errorTableWriter.isPresent());
-
+ if (this.cfg.operation == WriteOperationType.BULK_INSERT &&
source.getSourceType() == Source.SourceType.ROW
+ &&
this.props.getBoolean(DataSourceWriteOptions.ENABLE_ROW_WRITER().key(), false))
{
+ // enable row writer only when operation is BULK_INSERT, and source is
ROW type and if row writer is not explicitly disabled.
+ this.useRowWriter = true;
+ } else {
+ this.useRowWriter = false;
+ }
}
/**
@@ -382,7 +374,7 @@ public class StreamSync implements Serializable, Closeable {
HoodieTableConfig.CDC_ENABLED.defaultValue()))
.setCDCSupplementalLoggingMode(props.getString(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key(),
HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.defaultValue()))
- .setShouldDropPartitionColumns(isDropPartitionColumns())
+
.setShouldDropPartitionColumns(HoodieStreamerUtils.isDropPartitionColumns(props))
.setHiveStylePartitioningEnable(props.getBoolean(HIVE_STYLE_PARTITIONING_ENABLE.key(),
Boolean.parseBoolean(HIVE_STYLE_PARTITIONING_ENABLE.defaultValue())))
.setUrlEncodePartitioning(props.getBoolean(URL_ENCODE_PARTITIONING.key(),
@@ -408,19 +400,25 @@ public class StreamSync implements Serializable,
Closeable {
.build();
String instantTime = metaClient.createNewInstantTime();
- Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
srcRecordsWithCkpt = readFromSource(instantTime);
+ Pair<InputBatch,Boolean> inputBatchIsEmptyPair =
readFromSource(instantTime);
+
+ if (inputBatchIsEmptyPair != null) {
+ final JavaRDD<HoodieRecord> recordsFromSource;
+ if (useRowWriter) {
+ recordsFromSource = hoodieSparkContext.emptyRDD();
+ } else {
+ recordsFromSource = (JavaRDD<HoodieRecord>)
inputBatchIsEmptyPair.getKey().getBatch().get();
+ }
- if (srcRecordsWithCkpt != null) {
- final JavaRDD<HoodieRecord> recordsFromSource =
srcRecordsWithCkpt.getRight().getRight();
// this is the first input batch. If schemaProvider not set, use it and
register Avro Schema and start
// compactor
if (writeClient == null) {
- this.schemaProvider = srcRecordsWithCkpt.getKey();
+ this.schemaProvider =
inputBatchIsEmptyPair.getKey().getSchemaProvider();
// Setup HoodieWriteClient and compaction now that we decided on schema
setupWriteClient(recordsFromSource);
} else {
- Schema newSourceSchema = srcRecordsWithCkpt.getKey().getSourceSchema();
- Schema newTargetSchema = srcRecordsWithCkpt.getKey().getTargetSchema();
+ Schema newSourceSchema =
inputBatchIsEmptyPair.getKey().getSchemaProvider().getSourceSchema();
+ Schema newTargetSchema =
inputBatchIsEmptyPair.getKey().getSchemaProvider().getTargetSchema();
if (!(processedSchema.isSchemaPresent(newSourceSchema))
|| !(processedSchema.isSchemaPresent(newTargetSchema))) {
LOG.info("Seeing new schema. Source :" +
newSourceSchema.toString(true)
@@ -449,8 +447,7 @@ public class StreamSync implements Serializable, Closeable {
}
}
- result = writeToSink(instantTime, recordsFromSource,
- srcRecordsWithCkpt.getRight().getLeft(), metrics,
overallTimerContext);
+ result = writeToSinkAndDoMetaSync(instantTime,
inputBatchIsEmptyPair.getKey(), inputBatchIsEmptyPair.getValue(), metrics,
overallTimerContext);
}
metrics.updateStreamerSyncMetrics(System.currentTimeMillis());
@@ -476,11 +473,10 @@ public class StreamSync implements Serializable,
Closeable {
/**
* Read from Upstream Source and apply transformation if needed.
*
- * @return Pair<SchemaProvider, Pair < String, JavaRDD < HoodieRecord>>>
Input data read from upstream source, consists
- * of schemaProvider, checkpointStr and hoodieRecord
+ * @return Pair<InputBatch and Boolean> Input data read from upstream
source, and boolean is true if empty.
* @throws Exception in case of any Exception
*/
- public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
readFromSource(String instantTime) throws IOException {
+ public Pair<InputBatch, Boolean> readFromSource(String instantTime) throws
IOException {
// Retrieve the previous round checkpoints, if any
Option<String> resumeCheckpointStr = Option.empty();
if (commitsTimelineOpt.isPresent()) {
@@ -495,10 +491,10 @@ public class StreamSync implements Serializable,
Closeable {
int maxRetryCount = cfg.retryOnSourceFailures ? cfg.maxRetryCount : 1;
int curRetryCount = 0;
- Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> sourceDataToSync
= null;
+ Pair<InputBatch, Boolean> sourceDataToSync = null;
while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) {
try {
- sourceDataToSync = fetchFromSource(resumeCheckpointStr, instantTime);
+ sourceDataToSync =
fetchFromSourceAndPrepareRecords(resumeCheckpointStr, instantTime);
} catch (HoodieSourceTimeoutException e) {
if (curRetryCount >= maxRetryCount) {
throw e;
@@ -515,17 +511,54 @@ public class StreamSync implements Serializable,
Closeable {
return sourceDataToSync;
}
- private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
fetchFromSource(Option<String> resumeCheckpointStr, String instantTime) {
+ private Pair<InputBatch, Boolean>
fetchFromSourceAndPrepareRecords(Option<String> resumeCheckpointStr, String
instantTime) {
HoodieRecordType recordType = createRecordMerger(props).getRecordType();
if (recordType == HoodieRecordType.SPARK &&
HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
+ && !cfg.operation.equals(WriteOperationType.BULK_INSERT)
&&
HoodieLogBlockType.fromId(props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
"avro"))
!= HoodieLogBlockType.PARQUET_DATA_BLOCK) {
throw new UnsupportedOperationException("Spark record only support
parquet log.");
}
- final Option<JavaRDD<GenericRecord>> avroRDDOptional;
- final String checkpointStr;
- SchemaProvider schemaProvider;
+ InputBatch inputBatch = fetchNextBatchFromSource(resumeCheckpointStr);
+ final String checkpointStr = inputBatch.getCheckpointForNextBatch();
+ final SchemaProvider schemaProvider = inputBatch.getSchemaProvider();
+
+ // handle no new data and no change in checkpoint
+ if (!cfg.allowCommitOnNoCheckpointChange && Objects.equals(checkpointStr,
resumeCheckpointStr.orElse(null))) {
+ LOG.info("No new data, source checkpoint has not changed. Nothing to
commit. Old checkpoint=("
+ + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
+ String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
+ hoodieMetrics.updateMetricsForEmptyData(commitActionType);
+ return null;
+ }
+
+ // handle empty batch with change in checkpoint
+ hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking
if input is empty");
+ Pair<InputBatch, Boolean> preparedInputBatchIsEmptyPair =
handleEmptyBatch(useRowWriter, inputBatch, checkpointStr, schemaProvider);
+ if (preparedInputBatchIsEmptyPair.getValue()) { // return if empty batch
+ return preparedInputBatchIsEmptyPair;
+ }
+
+ if (useRowWriter) { // no additional processing required for row writer.
+ return Pair.of(inputBatch, false);
+ } else {
+ JavaRDD<HoodieRecord> records =
HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(),
schemaProvider,
+ recordType, autoGenerateRecordKeys, instantTime);
+ return Pair.of(new InputBatch(Option.of(records), checkpointStr,
schemaProvider), false);
+ }
+ }
+
+ /**
+ * Fetch data from source, apply transformations if any, align with schema
from schema provider if need be and return the input batch.
+ * @param resumeCheckpointStr checkpoint to resume from source.
+ * @return {@link InputBatch} containing the new batch of data from source
along with new checkpoint and schema provider instance to use.
+ */
+ private InputBatch fetchNextBatchFromSource(Option<String>
resumeCheckpointStr) {
+ Option<JavaRDD<GenericRecord>> avroRDDOptional = null;
+ String checkpointStr = null;
+ SchemaProvider schemaProvider = null;
+ InputBatch inputBatchForWriter = null; // row writer
if (transformer.isPresent()) {
// Transformation is needed. Fetch New rows in Row Format, apply
transformation and then convert them
// to generic records for writing
@@ -541,29 +574,37 @@ public class StreamSync implements Serializable,
Closeable {
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
boolean reconcileSchema =
props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
if (this.userProvidedSchemaProvider != null &&
this.userProvidedSchemaProvider.getTargetSchema() != null) {
- // If the target schema is specified through Avro schema,
- // pass in the schema for the Row-to-Avro conversion
- // to avoid nullability mismatch between Avro schema and Row schema
- if (errorTableWriter.isPresent()
- &&
props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
-
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue())) {
- // If the above conditions are met, trigger error events for the
rows whose conversion to
- // avro records fails.
- avroRDDOptional = transformed.map(
- rowDataset -> {
- Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs =
HoodieSparkUtils.safeCreateRDD(rowDataset,
- HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE,
reconcileSchema,
-
Option.of(this.userProvidedSchemaProvider.getTargetSchema()));
-
errorTableWriter.get().addErrorEvents(safeCreateRDDs._2().toJavaRDD()
- .map(evStr -> new ErrorEvent<>(evStr,
- ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE)));
- return safeCreateRDDs._1.toJavaRDD();
- });
+ if (useRowWriter) {
+ if (errorTableWriter.isPresent()) {
+ throw new HoodieException("Error table is not yet supported with
row writer");
+ }
+ inputBatchForWriter = new InputBatch(transformed, checkpointStr,
this.userProvidedSchemaProvider);
} else {
- avroRDDOptional = transformed.map(
- rowDataset -> getTransformedRDD(rowDataset, reconcileSchema,
this.userProvidedSchemaProvider.getTargetSchema()));
+ // non row writer path
+ // If the target schema is specified through Avro schema,
+ // pass in the schema for the Row-to-Avro conversion
+ // to avoid nullability mismatch between Avro schema and Row schema
+ if (errorTableWriter.isPresent()
+ &&
props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
+
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue())) {
+ // If the above conditions are met, trigger error events for the
rows whose conversion to
+ // avro records fails.
+ avroRDDOptional = transformed.map(
+ rowDataset -> {
+ Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs =
HoodieSparkUtils.safeCreateRDD(rowDataset,
+ HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE,
reconcileSchema,
+
Option.of(this.userProvidedSchemaProvider.getTargetSchema()));
+
errorTableWriter.get().addErrorEvents(safeCreateRDDs._2().toJavaRDD()
+ .map(evStr -> new ErrorEvent<>(evStr,
+
ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE)));
+ return safeCreateRDDs._1.toJavaRDD();
+ });
+ } else {
+ avroRDDOptional = transformed.map(
+ rowDataset -> getTransformedRDD(rowDataset, reconcileSchema,
this.userProvidedSchemaProvider.getTargetSchema()));
+ }
+ schemaProvider = this.userProvidedSchemaProvider;
}
- schemaProvider = this.userProvidedSchemaProvider;
} else {
Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs,
cfg.targetBasePath);
// Deduce proper target (writer's) schema for the transformed dataset,
reconciling its
@@ -584,87 +625,59 @@ public class StreamSync implements Serializable,
Closeable {
(SchemaProvider) new DelegatingSchemaProvider(props,
hoodieSparkContext.jsc(), dataAndCheckpoint.getSchemaProvider(),
new SimpleSchemaProvider(hoodieSparkContext.jsc(),
targetSchema, props)))
.orElse(dataAndCheckpoint.getSchemaProvider());
- // Rewrite transformed records into the expected target schema
- avroRDDOptional = transformed.map(t -> getTransformedRDD(t,
reconcileSchema, schemaProvider.getTargetSchema()));
+ if (useRowWriter) {
+ inputBatchForWriter = new InputBatch(transformed, checkpointStr,
schemaProvider);
+ } else {
+ // Rewrite transformed records into the expected target schema
+ SchemaProvider finalSchemaProvider = schemaProvider;
+ avroRDDOptional = transformed.map(t -> getTransformedRDD(t,
reconcileSchema, finalSchemaProvider.getTargetSchema()));
+ }
}
} else {
- // Pull the data from the source & prepare the write
- InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
- formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr,
cfg.sourceLimit);
- avroRDDOptional = dataAndCheckpoint.getBatch();
- checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
- schemaProvider = dataAndCheckpoint.getSchemaProvider();
+ if (useRowWriter) {
+ inputBatchForWriter =
formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
+ } else {
+ // Pull the data from the source & prepare the write
+ InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
+ formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr,
cfg.sourceLimit);
+ avroRDDOptional = dataAndCheckpoint.getBatch();
+ checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
+ schemaProvider = dataAndCheckpoint.getSchemaProvider();
+ }
}
- if (!cfg.allowCommitOnNoCheckpointChange && Objects.equals(checkpointStr,
resumeCheckpointStr.orElse(null))) {
- LOG.info("No new data, source checkpoint has not changed. Nothing to
commit. Old checkpoint=("
- + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
- String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
- hoodieMetrics.updateMetricsForEmptyData(commitActionType);
- return null;
+ if (useRowWriter) {
+ return inputBatchForWriter;
+ } else {
+ return new InputBatch(avroRDDOptional, checkpointStr, schemaProvider);
}
+ }
+ /**
+ * Handles empty batch from input.
+ * @param useRowWriter true if row write code path.
+ * @param inputBatch {@link InputBatch} instance to use.
+ * @param checkpointForNextBatch checkpiont to use for next batch.
+ * @param schemaProvider {@link SchemaProvider} instance of interest.
+ * @return a Pair of InputBatch and boolean. boolean value is set to true on
empty batch.
+ */
+ private Pair<InputBatch, Boolean> handleEmptyBatch(boolean useRowWriter,
InputBatch inputBatch,
+ String checkpointForNextBatch,
SchemaProvider schemaProvider) {
hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking
if input is empty");
- if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
- LOG.info("No new data, perform empty commit.");
- return Pair.of(schemaProvider, Pair.of(checkpointStr,
hoodieSparkContext.emptyRDD()));
- }
-
- boolean shouldCombine = cfg.filterDupes ||
cfg.operation.equals(WriteOperationType.UPSERT);
- Set<String> partitionColumns = getPartitionColumns(props);
- JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
-
- JavaRDD<HoodieRecord> records;
- SerializableSchema avroSchema = new
SerializableSchema(schemaProvider.getTargetSchema());
- SerializableSchema processedAvroSchema = new
SerializableSchema(isDropPartitionColumns() ?
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
- if (recordType == HoodieRecordType.AVRO) {
- records = avroRDD.mapPartitions(
- (FlatMapFunction<Iterator<GenericRecord>, HoodieRecord>)
genericRecordIterator -> {
- if (autoGenerateRecordKeys) {
-
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(TaskContext.getPartitionId()));
-
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
- }
- BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
- List<HoodieRecord> avroRecords = new ArrayList<>();
- while (genericRecordIterator.hasNext()) {
- GenericRecord genRec = genericRecordIterator.next();
- HoodieKey hoodieKey = new
HoodieKey(builtinKeyGenerator.getRecordKey(genRec),
builtinKeyGenerator.getPartitionPath(genRec));
- GenericRecord gr = isDropPartitionColumns() ?
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
- HoodieRecordPayload payload = shouldCombine ?
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
- (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
cfg.sourceOrderingField, false, props.getBoolean(
-
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
- : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
- avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload));
- }
- return avroRecords.iterator();
- });
- } else if (recordType == HoodieRecordType.SPARK) {
- // TODO we should remove it if we can read InternalRow from source.
- records = avroRDD.mapPartitions(itr -> {
- if (autoGenerateRecordKeys) {
- props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(TaskContext.getPartitionId()));
- props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG,
instantTime);
- }
- BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
- StructType baseStructType =
AvroConversionUtils.convertAvroSchemaToStructType(processedAvroSchema.get());
- StructType targetStructType = isDropPartitionColumns() ?
AvroConversionUtils
-
.convertAvroSchemaToStructType(HoodieAvroUtils.removeFields(processedAvroSchema.get(),
partitionColumns)) : baseStructType;
- HoodieAvroDeserializer deserializer =
SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(processedAvroSchema.get(),
baseStructType);
-
- return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), rec
-> {
- InternalRow row = (InternalRow) deserializer.deserialize(rec).get();
- String recordKey = builtinKeyGenerator.getRecordKey(row,
baseStructType).toString();
- String partitionPath = builtinKeyGenerator.getPartitionPath(row,
baseStructType).toString();
- return new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath),
- HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType,
targetStructType).apply(row), targetStructType, false);
- });
- });
+ if (useRowWriter) {
+ Option<Dataset<Row>> rowDatasetOptional = inputBatch.getBatch();
+ if ((!rowDatasetOptional.isPresent()) ||
(rowDatasetOptional.get().isEmpty())) {
+ LOG.info("No new data, perform empty commit.");
+ return Pair.of(new
InputBatch<>(Option.of(sparkSession.emptyDataFrame()), checkpointForNextBatch,
schemaProvider), true);
+ }
} else {
- throw new UnsupportedOperationException(recordType.name());
+ Option<JavaRDD<GenericRecord>> avroRDDOptional = inputBatch.getBatch();
+ if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty()))
{
+ LOG.info("No new data, perform empty commit.");
+ return Pair.of(new
InputBatch(Option.of(hoodieSparkContext.emptyRDD()), checkpointForNextBatch,
schemaProvider), true);
+ }
}
-
- return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
+ return Pair.of(inputBatch, false);
}
private JavaRDD<GenericRecord> getTransformedRDD(Dataset<Row> rowDataset,
boolean reconcileSchema, Schema readerSchema) {
@@ -750,70 +763,44 @@ public class StreamSync implements Serializable,
Closeable {
}).orElse(Option.empty());
}
+ private HoodieWriteConfig prepareHoodieConfigForRowWriter(Schema
writerSchema) {
+ HoodieConfig hoodieConfig = new
HoodieConfig(HoodieStreamer.Config.getProps(fs, cfg));
+ hoodieConfig.setValue(DataSourceWriteOptions.TABLE_TYPE(), cfg.tableType);
+ hoodieConfig.setValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(),
cfg.payloadClassName);
+ hoodieConfig.setValue(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(),
HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(props));
+ hoodieConfig.setValue("path", cfg.targetBasePath);
+ return HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema,
hoodieConfig, cfg.targetBasePath, cfg.targetTableName);
+ }
+
/**
* Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive
if needed.
*
* @param instantTime instant time to use for ingest.
- * @param records Input Records
- * @param checkpointStr Checkpoint String
+ * @param inputBatch input batch that contains the records,
checkpoint, and schema provider
+ * @param inputIsEmpty true if input batch is empty.
* @param metrics Metrics
* @param overallTimerContext Timer Context
* @return Option Compaction instant if one is scheduled
*/
- private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(String
instantTime, JavaRDD<HoodieRecord> records, String checkpointStr,
-
HoodieIngestionMetrics metrics,
- Timer.Context
overallTimerContext) {
+ private Pair<Option<String>, JavaRDD<WriteStatus>>
writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch, boolean
inputIsEmpty,
+
HoodieIngestionMetrics metrics,
+
Timer.Context overallTimerContext) {
Option<String> scheduledCompactionInstant = Option.empty();
- // filter dupes if needed
- if (cfg.filterDupes) {
- records = DataSourceUtils.dropDuplicates(hoodieSparkContext.jsc(),
records, writeClient.getConfig());
- }
-
- boolean isEmpty = records.isEmpty();
- instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
- LOG.info("Starting commit : " + instantTime);
-
- HoodieWriteResult writeResult;
- Map<String, List<String>> partitionToReplacedFileIds =
Collections.emptyMap();
- JavaRDD<WriteStatus> writeStatusRDD;
- switch (cfg.operation) {
- case INSERT:
- writeStatusRDD = writeClient.insert(records, instantTime);
- break;
- case UPSERT:
- writeStatusRDD = writeClient.upsert(records, instantTime);
- break;
- case BULK_INSERT:
- writeStatusRDD = writeClient.bulkInsert(records, instantTime);
- break;
- case INSERT_OVERWRITE:
- writeResult = writeClient.insertOverwrite(records, instantTime);
- partitionToReplacedFileIds =
writeResult.getPartitionToReplaceFileIds();
- writeStatusRDD = writeResult.getWriteStatuses();
- break;
- case INSERT_OVERWRITE_TABLE:
- writeResult = writeClient.insertOverwriteTable(records, instantTime);
- partitionToReplacedFileIds =
writeResult.getPartitionToReplaceFileIds();
- writeStatusRDD = writeResult.getWriteStatuses();
- break;
- case DELETE_PARTITION:
- List<String> partitions = records.map(record ->
record.getPartitionPath()).distinct().collect();
- writeResult = writeClient.deletePartitions(partitions, instantTime);
- partitionToReplacedFileIds =
writeResult.getPartitionToReplaceFileIds();
- writeStatusRDD = writeResult.getWriteStatuses();
- break;
- default:
- throw new HoodieStreamerException("Unknown operation : " +
cfg.operation);
- }
+ // write to hudi and fetch result
+ Pair<WriteClientWriteResult, Boolean> writeClientWriteResultIsEmptyPair =
writeToSink(inputBatch, instantTime, inputIsEmpty);
+ JavaRDD<WriteStatus> writeStatusRDD =
writeClientWriteResultIsEmptyPair.getKey().getWriteStatusRDD();
+ Map<String, List<String>> partitionToReplacedFileIds =
writeClientWriteResultIsEmptyPair.getKey().getPartitionToReplacedFileIds();
+ boolean isEmpty = writeClientWriteResultIsEmptyPair.getRight();
+ // process write status
long totalErrorRecords =
writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
long totalRecords =
writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
boolean hasErrors = totalErrorRecords > 0;
if (!hasErrors || cfg.commitOnErrors) {
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
if (!getBooleanWithAltKeys(props, CHECKPOINT_FORCE_SKIP)) {
- if (checkpointStr != null) {
- checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
+ if (inputBatch.getCheckpointForNextBatch() != null) {
+ checkpointCommitMetadata.put(CHECKPOINT_KEY,
inputBatch.getCheckpointForNextBatch());
}
if (cfg.checkpoint != null) {
checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, cfg.checkpoint);
@@ -846,7 +833,7 @@ public class StreamSync implements Serializable, Closeable {
boolean success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata), commitActionType,
partitionToReplacedFileIds, Option.empty());
if (success) {
LOG.info("Commit " + instantTime + " successful!");
- this.formatAdapter.getSource().onCommit(checkpointStr);
+
this.formatAdapter.getSource().onCommit(inputBatch.getCheckpointForNextBatch());
// Schedule compaction if needed
if (cfg.isAsyncCompactionEnabled()) {
scheduledCompactionInstant =
writeClient.scheduleCompaction(Option.empty());
@@ -913,6 +900,58 @@ public class StreamSync implements Serializable, Closeable
{
throw lastException;
}
+ private Pair<WriteClientWriteResult, Boolean> writeToSink(InputBatch
inputBatch, String instantTime, boolean inputIsEmpty) {
+ WriteClientWriteResult writeClientWriteResult = null;
+ instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
+ boolean isEmpty = inputIsEmpty;
+
+ if (useRowWriter) {
+ Dataset<Row> df = (Dataset<Row>) inputBatch.getBatch().get();
+ HoodieWriteConfig hoodieWriteConfig =
prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema());
+ BaseDatasetBulkInsertCommitActionExecutor executor = new
HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig,
writeClient, instantTime);
+ writeClientWriteResult = new WriteClientWriteResult(executor.execute(df,
!HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses());
+ } else {
+ JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>)
inputBatch.getBatch().get();
+ // filter dupes if needed
+ if (cfg.filterDupes) {
+ records = DataSourceUtils.dropDuplicates(hoodieSparkContext.jsc(),
records, writeClient.getConfig());
+ isEmpty = records.isEmpty();
+ }
+
+ HoodieWriteResult writeResult = null;
+ switch (cfg.operation) {
+ case INSERT:
+ writeClientWriteResult = new
WriteClientWriteResult(writeClient.insert(records, instantTime));
+ break;
+ case UPSERT:
+ writeClientWriteResult = new
WriteClientWriteResult(writeClient.upsert(records, instantTime));
+ break;
+ case BULK_INSERT:
+ writeClientWriteResult = new
WriteClientWriteResult(writeClient.bulkInsert(records, instantTime));
+ break;
+ case INSERT_OVERWRITE:
+ writeResult = writeClient.insertOverwrite(records, instantTime);
+ writeClientWriteResult = new
WriteClientWriteResult(writeResult.getWriteStatuses());
+
writeClientWriteResult.setPartitionToReplacedFileIds(writeResult.getPartitionToReplaceFileIds());
+ break;
+ case INSERT_OVERWRITE_TABLE:
+ writeResult = writeClient.insertOverwriteTable(records, instantTime);
+ writeClientWriteResult = new
WriteClientWriteResult(writeResult.getWriteStatuses());
+
writeClientWriteResult.setPartitionToReplacedFileIds(writeResult.getPartitionToReplaceFileIds());
+ break;
+ case DELETE_PARTITION:
+ List<String> partitions = records.map(record ->
record.getPartitionPath()).distinct().collect();
+ writeResult = writeClient.deletePartitions(partitions, instantTime);
+ writeClientWriteResult = new
WriteClientWriteResult(writeResult.getWriteStatuses());
+
writeClientWriteResult.setPartitionToReplacedFileIds(writeResult.getPartitionToReplaceFileIds());
+ break;
+ default:
+ throw new HoodieStreamerException("Unknown operation : " +
cfg.operation);
+ }
+ }
+ return Pair.of(writeClientWriteResult, isEmpty);
+ }
+
private String getSyncClassShortName(String syncClassName) {
return syncClassName.substring(syncClassName.lastIndexOf(".") + 1);
}
@@ -969,8 +1008,8 @@ public class StreamSync implements Serializable, Closeable
{
private void reInitWriteClient(Schema sourceSchema, Schema targetSchema,
JavaRDD<HoodieRecord> records) throws IOException {
LOG.info("Setting up new Hoodie Write Client");
- if (isDropPartitionColumns()) {
- targetSchema = HoodieAvroUtils.removeFields(targetSchema,
getPartitionColumns(props));
+ if (HoodieStreamerUtils.isDropPartitionColumns(props)) {
+ targetSchema = HoodieAvroUtils.removeFields(targetSchema,
HoodieStreamerUtils.getPartitionColumns(props));
}
registerAvroSchemas(sourceSchema, targetSchema);
final HoodieWriteConfig initialWriteConfig =
getHoodieClientConfig(targetSchema);
@@ -1191,22 +1230,25 @@ public class StreamSync implements Serializable,
Closeable {
}
}
- /**
- * Set based on hoodie.datasource.write.drop.partition.columns config.
- * When set to true, will not write the partition columns into the table.
- */
- private Boolean isDropPartitionColumns() {
- return props.getBoolean(DROP_PARTITION_COLUMNS.key(),
DROP_PARTITION_COLUMNS.defaultValue());
- }
+ class WriteClientWriteResult {
+ private Map<String, List<String>> partitionToReplacedFileIds =
Collections.emptyMap();
+ private JavaRDD<WriteStatus> writeStatusRDD;
- /**
- * Get the partition columns as a set of strings.
- *
- * @param props TypedProperties
- * @return Set of partition columns.
- */
- private Set<String> getPartitionColumns(TypedProperties props) {
- String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
- return
Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
+ public WriteClientWriteResult(JavaRDD<WriteStatus> writeStatusRDD) {
+ this.writeStatusRDD = writeStatusRDD;
+ }
+
+ public Map<String, List<String>> getPartitionToReplacedFileIds() {
+ return partitionToReplacedFileIds;
+ }
+
+ public void setPartitionToReplacedFileIds(Map<String, List<String>>
partitionToReplacedFileIds) {
+ this.partitionToReplacedFileIds = partitionToReplacedFileIds;
+ }
+
+ public JavaRDD<WriteStatus> getWriteStatusRDD() {
+ return writeStatusRDD;
+ }
}
+
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 94f42e3713a..095a34cf75c 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -399,7 +399,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
}
}
- static List<String> getAsyncServicesConfigs(int totalRecords, String
autoClean, String inlineCluster,
+ static List<String> getTableServicesConfigs(int totalRecords, String
autoClean, String inlineCluster,
String inlineClusterMaxCommit,
String asyncCluster, String asyncClusterMaxCommit) {
List<String> configs = new ArrayList<>();
configs.add(String.format("%s=%d",
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
@@ -634,7 +634,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
boolean ret = false;
while (!ret && !dsFuture.isDone()) {
try {
- Thread.sleep(3000);
+ Thread.sleep(2000);
ret = condition.apply(true);
} catch (Throwable error) {
LOG.warn("Got error :", error);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index d1cb3f1b240..c5133a314fd 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -209,7 +209,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
addRecordMerger(recordType, cfg.configs);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
- cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
asyncCluster, ""));
+ cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "", "",
asyncCluster, ""));
cfg.configs.addAll(getAllMultiWriterConfigs());
customConfigs.forEach(config -> cfg.configs.add(config));
return new HoodieDeltaStreamer(cfg, jsc);
@@ -783,7 +783,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
addRecordMerger(recordType, cfg.configs);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
- cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true",
"2", "", ""));
+ cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "true",
"2", "", ""));
cfg.configs.add(String.format("%s=%s",
"hoodie.datasource.write.row.writer.enable", "false"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
@@ -817,7 +817,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
Option.empty());
// do another ingestion with inline clustering enabled
- cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true",
"2", "", ""));
+ cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "true",
"2", "", ""));
cfg.retryLastPendingInlineClusteringJob = true;
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
ds2.sync();
@@ -884,7 +884,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
addRecordMerger(recordType, cfg.configs);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
- cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true",
"2", "", ""));
+ cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "true",
"2", "", ""));
cfg.configs.add(String.format("%s=%s",
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
cfg.configs.add(String.format("%s=%s",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
cfg.configs.add(String.format("%s=%s",
HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
@@ -934,7 +934,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
assertFalse(replacedFilePaths.isEmpty());
// Step 4 : Add commits with insert of 1 record and trigger sync/async
cleaner and archive.
- List<String> configs = getAsyncServicesConfigs(1, "true", "true", "6", "",
"");
+ List<String> configs = getTableServicesConfigs(1, "true", "true", "6", "",
"");
configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_POLICY.key(),
"KEEP_LATEST_COMMITS"));
configs.add(String.format("%s=%s",
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
configs.add(String.format("%s=%s",
HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "4"));
@@ -1129,7 +1129,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
addRecordMerger(recordType, cfg.configs);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
- cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
"true", "3"));
+ cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "", "",
"true", "3"));
cfg.configs.add(String.format("%s=%s",
"hoodie.datasource.write.row.writer.enable", "false"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
@@ -1165,7 +1165,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
addRecordMerger(recordType, cfg.configs);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
- cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
"true", "2"));
+ cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "", "",
"true", "2"));
cfg.configs.add(String.format("%s=%s",
"hoodie.datasource.write.row.writer.enable", "false"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
@@ -1193,7 +1193,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
addRecordMerger(recordType, cfg.configs);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
- cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
"true", "3"));
+ cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "", "",
"true", "3"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, fs);
@@ -1218,7 +1218,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
addRecordMerger(recordType, cfg.configs);
cfg.continuousMode = false;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
- cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "false",
"0", "false", "0"));
+ cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "false",
"0", "false", "0"));
cfg.configs.addAll(getAllMultiWriterConfigs());
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
@@ -1306,6 +1306,152 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
}
+ @Test
+ public void testBulkInsertRowWriterNoSchemaProviderNoTransformer() throws
Exception {
+ testBulkInsertRowWriterMultiBatches(false, null);
+ }
+
+ @Test
+ public void testBulkInsertRowWriterWithoutSchemaProviderAndTransformer()
throws Exception {
+ testBulkInsertRowWriterMultiBatches(false,
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
+ }
+
+ @Test
+ public void testBulkInsertRowWriterWithSchemaProviderAndNoTransformer()
throws Exception {
+ testBulkInsertRowWriterMultiBatches(true, null);
+ }
+
+ @Test
+ public void testBulkInsertRowWriterWithSchemaProviderAndTransformer() throws
Exception {
+ testBulkInsertRowWriterMultiBatches(true,
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
+ }
+
+ @Test
+ public void testBulkInsertRowWriterForEmptyBatch() throws Exception {
+ testBulkInsertRowWriterMultiBatches(false, null, true);
+ }
+
+ private void testBulkInsertRowWriterMultiBatches(boolean useSchemaProvider,
List<String> transformerClassNames) throws Exception {
+ testBulkInsertRowWriterMultiBatches(useSchemaProvider,
transformerClassNames, false);
+ }
+
+ private void testBulkInsertRowWriterMultiBatches(Boolean useSchemaProvider,
List<String> transformerClassNames, boolean testEmptyBatch) throws Exception {
+ PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+ int parquetRecordsCount = 100;
+ boolean hasTransformer = transformerClassNames != null &&
!transformerClassNames.isEmpty();
+ prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT,
FIRST_PARQUET_FILE_NAME, false, null, null);
+ prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc",
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" :
"");
+
+ String tableBasePath = basePath + "/test_parquet_table" + testNum;
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT, testEmptyBatch ?
TestParquetDFSSourceEmptyBatch.class.getName()
+ : ParquetDFSSource.class.getName(),
+ transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
+ useSchemaProvider, 100000, false, null, null, "timestamp", null);
+ cfg.configs.add(DataSourceWriteOptions.ENABLE_ROW_WRITER().key() +
"=true");
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
+ deltaStreamer.sync();
+ assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+
+ try {
+ if (testEmptyBatch) {
+ prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false,
null, null);
+ deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
+ deltaStreamer.sync();
+ // since we mimic'ed empty batch, total records should be same as
first sync().
+ assertRecordCount(200, tableBasePath, sqlContext);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
+
+ // validate table schema fetches valid schema from last but one commit.
+ TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
+ assertNotEquals(tableSchemaResolver.getTableAvroSchema(),
Schema.create(Schema.Type.NULL).toString());
+ }
+
+ int recordsSoFar = testEmptyBatch ? 200 : 100;
+
+ // add 3 more batches and ensure all commits succeed.
+ for (int i = 2; i < 5; i++) {
+ prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, Integer.toString(i) +
".parquet", false, null, null);
+ deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
+ deltaStreamer.sync();
+ assertRecordCount(recordsSoFar + (i - 1) * 100, tableBasePath,
sqlContext);
+ if (i == 2 || i == 4) { // this validation reloads the timeline. So,
we are validating only for first and last batch.
+ // validate commit metadata for all completed commits to have valid
schema in extra metadata.
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
+
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry
-> assertValidSchemaInCommitMetadata(entry, metaClient));
+ }
+ }
+ } finally {
+ deltaStreamer.shutdownGracefully();
+ }
+ testNum++;
+ }
+
+ @Test
+ public void testBulkInsertRowWriterContinuousModeWithAsyncClustering()
throws Exception {
+ testBulkInsertRowWriterContinuousMode(false, null, false,
+ getTableServicesConfigs(2000, "false", "", "", "true", "3"));
+ }
+
+ @Test
+ public void testBulkInsertRowWriterContinuousModeWithInlineClustering()
throws Exception {
+ testBulkInsertRowWriterContinuousMode(false, null, false,
+ getTableServicesConfigs(2000, "false", "true", "3", "false", ""));
+ }
+
+ private void testBulkInsertRowWriterContinuousMode(Boolean
useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch,
List<String> customConfigs) throws Exception {
+ PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+ int parquetRecordsCount = 100;
+ boolean hasTransformer = transformerClassNames != null &&
!transformerClassNames.isEmpty();
+ prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT,
FIRST_PARQUET_FILE_NAME, false, null, null);
+ prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc",
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" :
"");
+
+ // generate data asynchronously.
+ Future inputGenerationFuture =
Executors.newSingleThreadExecutor().submit(() -> {
+ try {
+ int counter = 2;
+ while (counter < 100) { // lets keep going. if the test times out, we
will cancel the future within finally. So, safe to generate 100 batches.
+ LOG.info("Generating data for batch " + counter);
+ prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT,
Integer.toString(counter) + ".parquet", false, null, null);
+ counter++;
+ Thread.sleep(2000);
+ }
+ } catch (Exception ex) {
+ LOG.warn("Input data generation failed", ex.getMessage());
+ throw new RuntimeException(ex.getMessage(), ex);
+ }
+ });
+
+ // initialize configs for continuous ds
+ String tableBasePath = basePath + "/test_parquet_table" + testNum;
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.BULK_INSERT, testEmptyBatch ?
TestParquetDFSSourceEmptyBatch.class.getName()
+ : ParquetDFSSource.class.getName(),
+ transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
+ useSchemaProvider, 100000, false, null, null, "timestamp", null);
+ cfg.continuousMode = true;
+ cfg.configs.add(DataSourceWriteOptions.ENABLE_ROW_WRITER().key() +
"=true");
+ cfg.configs.addAll(customConfigs);
+
+ HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+ // trigger continuous DS and wait until 1 replace commit is complete.
+ try {
+ deltaStreamerTestRunner(ds, cfg, (r) -> {
+ TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
+ return true;
+ });
+ // There should be 4 commits, one of which should be a replace commit
+ TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
+ TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
+ } finally {
+ // clean up resources
+ ds.shutdownGracefully();
+ inputGenerationFuture.cancel(true);
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
+ }
+ testNum++;
+ }
+
/**
* Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental
processing using a 2 step pipeline The first
* step involves using a SQL template to transform a source TEST-DATA-SOURCE
============================> HUDI TABLE
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
index 53e1733c9a6..48a8a7100ff 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
@@ -61,7 +61,7 @@ public class TestHoodieDeltaStreamerDAGExecution extends
HoodieDeltaStreamerTest
// Configure 3 transformers of same type. 2nd transformer has no suffix
StageListener stageListener = new
StageListener("org.apache.hudi.table.action.commit.BaseCommitActionExecutor.executeClustering");
sparkSession.sparkContext().addSparkListener(stageListener);
- List<String> configs = getAsyncServicesConfigs(100, "false", "true", "1",
"", "");
+ List<String> configs = getTableServicesConfigs(100, "false", "true", "1",
"", "");
runDeltaStreamer(WriteOperationType.UPSERT, false, Option.of(configs));
assertEquals(1, stageListener.triggerCount);
}