This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 57d5da6 [HUDI-2330][HUDI-2335] Adding support for merge-on-read
tables (#3679)
57d5da6 is described below
commit 57d5da68aa646adfd327394daea720cb3d592b1d
Author: vinoth chandar <[email protected]>
AuthorDate: Thu Sep 16 15:24:34 2021 -0700
[HUDI-2330][HUDI-2335] Adding support for merge-on-read tables (#3679)
- Inserts go into logs, hashed by Kafka and Hudi partitions
- Fixed issues with the setupKafka script
- Bumped up the default commit interval to 300 seconds
- Minor renaming
---
.../java/org/apache/hudi/client/WriteStatus.java | 1 +
.../hudi/table/HoodieJavaMergeOnReadTable.java | 28 ++++-
.../org/apache/hudi/table/HoodieJavaTable.java | 5 +-
.../BaseJavaDeltaCommitActionExecutor.java | 35 ++++++
...JavaUpsertPreppedDeltaCommitActionExecutor.java | 102 ++++++++++++++++
hudi-kafka-connect/README.md | 8 +-
hudi-kafka-connect/demo/config-sink.json | 3 +-
hudi-kafka-connect/demo/setupKafka.sh | 131 ++++++++++++---------
.../connect/KafkaConnectFileIdPrefixProvider.java | 21 +---
.../hudi/connect/utils/KafkaConnectUtils.java | 17 +++
.../connect/writers/AbstractConnectWriter.java | 22 +++-
.../connect/writers/BufferedConnectWriter.java | 35 ++++--
.../hudi/connect/writers/KafkaConnectConfigs.java | 2 +-
.../writers/KafkaConnectTransactionServices.java | 21 +---
.../hudi/writers/TestAbstractConnectWriter.java | 6 +-
.../hudi/writers/TestBufferedConnectWriter.java | 2 +-
16 files changed, 315 insertions(+), 124 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
index a731384..8f74858 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
@@ -201,6 +201,7 @@ public class WriteStatus implements Serializable {
public String toString() {
final StringBuilder sb = new StringBuilder("WriteStatus {");
sb.append("fileId=").append(fileId);
+ sb.append(", writeStat=").append(stat);
sb.append(", globalError='").append(globalError).append('\'');
sb.append(", hasErrors='").append(hasErrors()).append('\'');
sb.append(", errorCount='").append(totalErrorRecords).append('\'');
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
index 4995af0..a78b71b 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
@@ -18,14 +18,40 @@
package org.apache.hudi.table;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import
org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
+import
org.apache.hudi.table.action.deltacommit.JavaUpsertPreppedDeltaCommitActionExecutor;
+
+import java.util.List;
public class HoodieJavaMergeOnReadTable<T extends HoodieRecordPayload> extends
HoodieJavaCopyOnWriteTable<T> {
protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config,
HoodieEngineContext context, HoodieTableMetaClient metaClient) {
super(config, context, metaClient);
}
- // TODO not support yet.
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>>
upsertPrepped(HoodieEngineContext context,
+ String
instantTime,
+
List<HoodieRecord<T>> preppedRecords) {
+ return new
JavaUpsertPreppedDeltaCommitActionExecutor<>((HoodieJavaEngineContext) context,
config,
+ this, instantTime, preppedRecords).execute();
+
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>>
bulkInsertPrepped(HoodieEngineContext context,
+ String
instantTime,
+
List<HoodieRecord<T>> preppedRecords,
+
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ return new
JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context,
config,
+ this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
+ }
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
index 219dec4..9cf9a6d 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -29,9 +29,8 @@ import
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hudi.index.JavaHoodieIndex;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.JavaHoodieIndex;
import java.util.List;
@@ -56,7 +55,7 @@ public abstract class HoodieJavaTable<T extends
HoodieRecordPayload>
case COPY_ON_WRITE:
return new HoodieJavaCopyOnWriteTable<>(config, context, metaClient);
case MERGE_ON_READ:
- throw new HoodieNotSupportedException("MERGE_ON_READ is not supported
yet");
+ return new HoodieJavaMergeOnReadTable<>(config, context, metaClient);
default:
throw new HoodieException("Unsupported table type :" +
metaClient.getTableType());
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseJavaDeltaCommitActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseJavaDeltaCommitActionExecutor.java
new file mode 100644
index 0000000..0b4a654
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseJavaDeltaCommitActionExecutor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor;
+
+public abstract class BaseJavaDeltaCommitActionExecutor<T extends
HoodieRecordPayload<T>> extends BaseJavaCommitActionExecutor<T> {
+
+ public BaseJavaDeltaCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
+ String instantTime,
WriteOperationType operationType) {
+ super(context, config, table, instantTime, operationType);
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/JavaUpsertPreppedDeltaCommitActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/JavaUpsertPreppedDeltaCommitActionExecutor.java
new file mode 100644
index 0000000..f6faa28
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/JavaUpsertPreppedDeltaCommitActionExecutor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.io.HoodieAppendHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.commit.JavaBulkInsertHelper;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+public class JavaUpsertPreppedDeltaCommitActionExecutor<T extends
HoodieRecordPayload<T>> extends BaseJavaDeltaCommitActionExecutor<T> {
+
+ private static final Logger LOG =
LogManager.getLogger(JavaUpsertPreppedDeltaCommitActionExecutor.class);
+
+ private final List<HoodieRecord<T>> preppedInputRecords;
+
+ public JavaUpsertPreppedDeltaCommitActionExecutor(HoodieJavaEngineContext
context, HoodieWriteConfig config, HoodieTable table,
+ String instantTime,
List<HoodieRecord<T>> preppedInputRecords) {
+ super(context, config, table, instantTime,
WriteOperationType.UPSERT_PREPPED);
+ this.preppedInputRecords = preppedInputRecords;
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute() {
+ HoodieWriteMetadata<List<WriteStatus>> result = new
HoodieWriteMetadata<>();
+ // First group by target file id.
+ HashMap<Pair<String, String>, List<HoodieRecord<T>>> recordsByFileId = new
HashMap<>();
+ List<HoodieRecord<T>> insertedRecords = new LinkedList<>();
+
+ // Split records into inserts and updates.
+ for (HoodieRecord<T> record : preppedInputRecords) {
+ if (!record.isCurrentLocationKnown()) {
+ insertedRecords.add(record);
+ } else {
+ Pair<String, String> fileIdPartitionPath =
Pair.of(record.getCurrentLocation().getFileId(), record.getPartitionPath());
+ if (!recordsByFileId.containsKey(fileIdPartitionPath)) {
+ recordsByFileId.put(fileIdPartitionPath, new LinkedList<>());
+ }
+ recordsByFileId.get(fileIdPartitionPath).add(record);
+ }
+ }
+ LOG.info(String.format("Total update fileIDs %s, total inserts %s for
commit %s",
+ recordsByFileId.size(), insertedRecords.size(), instantTime));
+
+ List<WriteStatus> allWriteStatuses = new ArrayList<>();
+ try {
+ recordsByFileId.forEach((k, v) -> {
+ HoodieAppendHandle<?, ?, ?, ?> appendHandle = new
HoodieAppendHandle(config, instantTime, table,
+ k.getRight(), k.getLeft(), v.iterator(), taskContextSupplier);
+ appendHandle.doAppend();
+ allWriteStatuses.addAll(appendHandle.close());
+ });
+
+ if (insertedRecords.size() > 0) {
+ HoodieWriteMetadata<List<WriteStatus>> insertResult =
JavaBulkInsertHelper.newInstance()
+ .bulkInsert(insertedRecords, instantTime, table, config, this,
false, Option.empty());
+ allWriteStatuses.addAll(insertResult.getWriteStatuses());
+ }
+ } catch (Throwable e) {
+ if (e instanceof HoodieUpsertException) {
+ throw e;
+ }
+ throw new HoodieUpsertException("Failed to upsert for commit time " +
instantTime, e);
+ }
+
+ updateIndex(allWriteStatuses, result);
+ return result;
+ }
+}
diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md
index 85fc009..584fddf 100644
--- a/hudi-kafka-connect/README.md
+++ b/hudi-kafka-connect/README.md
@@ -70,9 +70,9 @@ Wait until the kafka cluster is up and running.
### 2 - Set up the schema registry
-Hudi leverages schema registry to obtain the latest schema when writing
records. While it supports most popular schema registries,
-we use Confluent schema registry. Download the latest confluent schema
registry code from https://github.com/confluentinc/schema-registry
-and start the schema registry service.
+Hudi leverages schema registry to obtain the latest schema when writing
records. While it supports most popular schema
+registries, we use Confluent schema registry. Download the latest confluent
platform and run the schema registry
+service.
```bash
cd $CONFLUENT_DIR
@@ -120,7 +120,7 @@ that can be changed based on the desired properties.
```bash
curl -X DELETE http://localhost:8083/connectors/hudi-sink
-curl -X POST -H "Content-Type:application/json" -d
@$HUDI-DIR/hudi-kafka-connect/demo/config-sink.json
http://localhost:8083/connectors
+curl -X POST -H "Content-Type:application/json" -d
@${HUDI_DIR}/hudi-kafka-connect/demo/config-sink.json
http://localhost:8083/connectors
```
Now, you should see that the connector is created and tasks are running.
diff --git a/hudi-kafka-connect/demo/config-sink.json
b/hudi-kafka-connect/demo/config-sink.json
index 75e6d84..2d2be00 100644
--- a/hudi-kafka-connect/demo/config-sink.json
+++ b/hudi-kafka-connect/demo/config-sink.json
@@ -9,10 +9,11 @@
"value.converter.schemas.enable": "false",
"topics": "hudi-test-topic",
"hoodie.table.name": "hudi-test-topic",
+ "hoodie.table.type": "MERGE_ON_READ",
"hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic",
"hoodie.datasource.write.recordkey.field": "volume",
"hoodie.datasource.write.partitionpath.field": "date",
"hoodie.schemaprovider.class":
"org.apache.hudi.schema.SchemaRegistryProvider",
"hoodie.deltastreamer.schemaprovider.registry.url":
"http://localhost:8081/subjects/hudi-test-topic/versions/latest"
- }
+ }
}
diff --git a/hudi-kafka-connect/demo/setupKafka.sh
b/hudi-kafka-connect/demo/setupKafka.sh
old mode 100644
new mode 100755
index f2c1735..81968a4
--- a/hudi-kafka-connect/demo/setupKafka.sh
+++ b/hudi-kafka-connect/demo/setupKafka.sh
@@ -16,38 +16,33 @@
#!/bin/bash
-## Directories
-HOME_DIR=~
-HUDI_DIR=${HOME_DIR}/hudi
-KAFKA_HOME=${HOME_DIR}/kafka
-
#########################
# The command line help #
#########################
usage() {
- echo "Usage: $0"
- echo " -n |--num-kafka-records, (required) number of kafka records to
generate"
- echo " -f |--raw-file, (optional) raw file for the kafka records"
- echo " -k |--kafka-topic, (optional) Topic name for Kafka"
- echo " -m |--num-kafka-partitions, (optional) number of kafka partitions"
- echo " -r |--record-key, (optional) field to use as record key"
- echo " -l |--num-hudi-partitions, (optional) number of hudi partitions"
- echo " -p |--partition-key, (optional) field to use as partition"
- echo " -s |--schema-file, (optional) path of the file containing the
schema of the records"
- exit 1
+ echo "Usage: $0"
+ echo " -n |--num-kafka-records, (required) number of kafka records to
generate"
+ echo " -f |--raw-file, (optional) raw file for the kafka records"
+ echo " -k |--kafka-topic, (optional) Topic name for Kafka"
+ echo " -m |--num-kafka-partitions, (optional) number of kafka partitions"
+ echo " -r |--record-key, (optional) field to use as record key"
+ echo " -l |--num-hudi-partitions, (optional) number of hudi partitions"
+ echo " -p |--partition-key, (optional) field to use as partition"
+ echo " -s |--schema-file, (optional) path of the file containing the
schema of the records"
+ exit 1
}
case "$1" in
- --help)
- usage
- exit 0
- ;;
+--help)
+ usage
+ exit 0
+ ;;
esac
if [ $# -lt 1 ]; then
- echo "Illegal number of parameters"
- usage
- exit 0
+ echo "Illegal number of parameters"
+ usage
+ exit 0
fi
## defaults
@@ -61,71 +56,91 @@ schemaFile=${HUDI_DIR}/docker/demo/config/schema.avsc
while getopts ":n:f:k:m:r:l:p:s:-:" opt; do
case $opt in
- n) num_records="$OPTARG"
+ n)
+ num_records="$OPTARG"
printf "Argument num-kafka-records is %s\n" "$num_records"
;;
- k) rawDataFile="$OPTARG"
+ k)
+ rawDataFile="$OPTARG"
printf "Argument raw-file is %s\n" "$rawDataFile"
;;
- f) kafkaTopicName="$OPTARG"
+ f)
+ kafkaTopicName="$OPTARG"
printf "Argument kafka-topic is %s\n" "$kafkaTopicName"
;;
- m) numKafkaPartitions="$OPTARG"
+ m)
+ numKafkaPartitions="$OPTARG"
printf "Argument num-kafka-partitions is %s\n" "$numKafkaPartitions"
;;
- r) recordKey="$OPTARG"
+ r)
+ recordKey="$OPTARG"
printf "Argument record-key is %s\n" "$recordKey"
;;
- l) numHudiPartitions="$OPTARG"
+ l)
+ numHudiPartitions="$OPTARG"
printf "Argument num-hudi-partitions is %s\n" "$numHudiPartitions"
;;
- p) partitionField="$OPTARG"
+ p)
+ partitionField="$OPTARG"
printf "Argument partition-key is %s\n" "$partitionField"
;;
- p) schemaFile="$OPTARG"
+ p)
+ schemaFile="$OPTARG"
printf "Argument schema-file is %s\n" "$schemaFile"
;;
- -) echo "Invalid option -$OPTARG" >&2
+ -)
+ echo "Invalid option -$OPTARG" >&2
;;
-esac
+ esac
done
# First delete the existing topic
-$KAFKA_HOME/bin/kafka-topics.sh --delete --topic ${kafkaTopicName}
--bootstrap-server localhost:9092
+#${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName}
--bootstrap-server localhost:9092
# Create the topic with 4 partitions
-$KAFKA_HOME/bin/kafka-topics.sh --create --topic ${kafkaTopicName}
--partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server
localhost:9092
-
+#${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName}
--partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server
localhost:9092
# Setup the schema registry
-export SCHEMA=`sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed
'/\/\*/,/*\//d' | jq tostring`
+export SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed
'/\/\*/,/*\//d' | jq tostring)
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data
"{\"schema\": $SCHEMA}"
http://localhost:8081/subjects/${kafkaTopicName}/versions
curl -X GET http://localhost:8081/subjects/${kafkaTopicName}/versions/latest
-
# Generate kafka messages from raw records
# Each records with unique keys and generate equal messages across each hudi
partition
partitions={}
-for ((i=0; i<${numHudiPartitions}; i++))
-do
- partitions[$i]="partition-"$i;
+for ((i = 0; i < ${numHudiPartitions}; i++)); do
+ partitions[$i]="partition-"$i
+done
+
+events_file=/tmp/kcat-input.events
+rm -f ${events_file}
+
+recordValue=0
+num_records=$((num_records + 0))
+
+for (( ; ; )); do
+ while IFS= read line; do
+ for partitionValue in "${partitions[@]}"; do
+ echo $line | jq --arg recordKey $recordKey --arg recordValue
$recordValue --arg partitionField $partitionField --arg partitionValue
$partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] =
$partitionValue' >>${events_file}
+ ((recordValue = recordValue + 1))
+
+ if [ $recordValue -gt $num_records ]; then
+ break
+ fi
+ done
+
+ if [ $recordValue -gt $num_records ]; then
+ break
+ fi
+
+ if [ $(($recordValue % 1000)) -eq 0 ]; then
+ sleep 1
+ fi
+ done <"$rawDataFile"
+
+ if [ $recordValue -gt $num_records ]; then
+ break
+ fi
done
-for ((recordValue=0; recordValue<=${num_records}; ))
-do
- while IFS= read line
- do
- for partitionValue in "${partitions[@]}"
- do
- echo $line | jq --arg recordKey $recordKey --arg recordValue
$recordValue --arg partitionField $partitionField --arg partitionValue
$partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] =
$partitionValue' | kafkacat -P -b localhost:9092 -t hudi-test-topic;
- ((recordValue++));
- if [ $recordValue -gt ${num_records} ]; then
- exit 0
- fi
- done
-
- if [ $(( $recordValue % 1000 )) -eq 0 ]
- then sleep 1
- fi
- done < "$rawDataFile"
-done
+grep -v '^$' ${events_file} | kcat -P -b localhost:9092 -t hudi-test-topic
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
index 536ad4a..9c46747 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
@@ -18,17 +18,13 @@
package org.apache.hudi.connect;
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.FileIdPrefixProvider;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import java.nio.charset.StandardCharsets;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Objects;
import java.util.Properties;
public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider {
@@ -52,18 +48,9 @@ public class KafkaConnectFileIdPrefixProvider extends
FileIdPrefixProvider {
// We use a combination of kafka partition and partition path as the file
id, and then hash it
// to generate a fixed sized hash.
String rawFileIdPrefix = kafkaPartition + partitionPath;
- MessageDigest md;
- try {
- md = MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e) {
- LOG.error("Fatal error selecting hash algorithm", e);
- throw new HoodieException(e);
- }
-
- byte[] digest =
Objects.requireNonNull(md).digest(rawFileIdPrefix.getBytes(StandardCharsets.UTF_8));
-
+ String hashedPrefix = KafkaConnectUtils.hashDigest(rawFileIdPrefix);
LOG.info("CreateFileId for Kafka Partition " + kafkaPartition + " : " +
partitionPath + " = " + rawFileIdPrefix
- + " === " + StringUtils.toHexString(digest).toUpperCase());
- return StringUtils.toHexString(digest).toUpperCase();
+ + " === " + hashedPrefix);
+ return hashedPrefix;
}
}
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
index 593cfb1..34a44c8 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
@@ -26,6 +26,7 @@ import
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
@@ -41,8 +42,12 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
@@ -137,4 +142,16 @@ public class KafkaConnectUtils {
return Option.empty();
}
}
+
+ public static String hashDigest(String stringToHash) {
+ MessageDigest md;
+ try {
+ md = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ LOG.error("Fatal error selecting hash algorithm", e);
+ throw new HoodieException(e);
+ }
+ byte[] digest =
Objects.requireNonNull(md).digest(stringToHash.getBytes(StandardCharsets.UTF_8));
+ return StringUtils.toHexString(digest).toUpperCase();
+ }
}
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
index 3d8e5f8..9888fd1 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
@@ -21,7 +21,9 @@ package org.apache.hudi.connect.writers;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
@@ -46,17 +48,19 @@ public abstract class AbstractConnectWriter implements
ConnectWriter<WriteStatus
public static final String KAFKA_JSON_CONVERTER =
"org.apache.kafka.connect.json.JsonConverter";
public static final String KAFKA_STRING_CONVERTER =
"org.apache.kafka.connect.storage.StringConverter";
private static final Logger LOG =
LogManager.getLogger(AbstractConnectWriter.class);
+ protected final String instantTime;
- private final KafkaConnectConfigs connectConfigs;
private final KeyGenerator keyGenerator;
private final SchemaProvider schemaProvider;
+ protected final KafkaConnectConfigs connectConfigs;
public AbstractConnectWriter(KafkaConnectConfigs connectConfigs,
KeyGenerator keyGenerator,
- SchemaProvider schemaProvider) {
+ SchemaProvider schemaProvider, String
instantTime) {
this.connectConfigs = connectConfigs;
this.keyGenerator = keyGenerator;
this.schemaProvider = schemaProvider;
+ this.instantTime = instantTime;
}
@Override
@@ -76,16 +80,22 @@ public abstract class AbstractConnectWriter implements
ConnectWriter<WriteStatus
throw new IOException("Unsupported Kafka Format type (" +
connectConfigs.getKafkaValueConverter() + ")");
}
- HoodieRecord hoodieRecord = new
HoodieRecord<>(keyGenerator.getKey(avroRecord.get()), new
HoodieAvroPayload(avroRecord));
+ // Tag records with a file ID based on kafka partition and hudi partition.
+ HoodieRecord<?> hoodieRecord = new
HoodieRecord<>(keyGenerator.getKey(avroRecord.get()), new
HoodieAvroPayload(avroRecord));
+ String fileId = KafkaConnectUtils.hashDigest(String.format("%s-%s",
record.kafkaPartition(), hoodieRecord.getPartitionPath()));
+ hoodieRecord.unseal();
+ hoodieRecord.setCurrentLocation(new HoodieRecordLocation(instantTime,
fileId));
+ hoodieRecord.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
+ hoodieRecord.seal();
writeHudiRecord(hoodieRecord);
}
@Override
public List<WriteStatus> close() throws IOException {
- return flushHudiRecords();
+ return flushRecords();
}
- protected abstract void writeHudiRecord(HoodieRecord<HoodieAvroPayload>
record);
+ protected abstract void writeHudiRecord(HoodieRecord<?> record);
- protected abstract List<WriteStatus> flushHudiRecords() throws IOException;
+ protected abstract List<WriteStatus> flushRecords() throws IOException;
}
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
index a60293d..0449f07 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
@@ -21,8 +21,9 @@ package org.apache.hudi.connect.writers;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
@@ -39,8 +40,8 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
-import java.util.stream.Collectors;
/**
* Specific implementation of a Hudi Writer that buffers all incoming records,
@@ -52,9 +53,8 @@ public class BufferedConnectWriter extends
AbstractConnectWriter {
private final HoodieEngineContext context;
private final HoodieJavaWriteClient writeClient;
- private final String instantTime;
private final HoodieWriteConfig config;
- private ExternalSpillableMap<String, HoodieRecord<HoodieAvroPayload>>
bufferedRecords;
+ private ExternalSpillableMap<String, HoodieRecord<?>> bufferedRecords;
public BufferedConnectWriter(HoodieEngineContext context,
HoodieJavaWriteClient writeClient,
@@ -63,10 +63,9 @@ public class BufferedConnectWriter extends
AbstractConnectWriter {
HoodieWriteConfig config,
KeyGenerator keyGenerator,
SchemaProvider schemaProvider) {
- super(connectConfigs, keyGenerator, schemaProvider);
+ super(connectConfigs, keyGenerator, schemaProvider, instantTime);
this.context = context;
this.writeClient = writeClient;
- this.instantTime = instantTime;
this.config = config;
init();
}
@@ -88,12 +87,12 @@ public class BufferedConnectWriter extends
AbstractConnectWriter {
}
@Override
- public void writeHudiRecord(HoodieRecord<HoodieAvroPayload> record) {
+ public void writeHudiRecord(HoodieRecord<?> record) {
bufferedRecords.put(record.getRecordKey(), record);
}
@Override
- public List<WriteStatus> flushHudiRecords() throws IOException {
+ public List<WriteStatus> flushRecords() throws IOException {
try {
LOG.info("Number of entries in MemoryBasedMap => "
+ bufferedRecords.getInMemoryMapNumEntries()
@@ -102,15 +101,25 @@ public class BufferedConnectWriter extends
AbstractConnectWriter {
+ bufferedRecords.getDiskBasedMapNumEntries() + "Size of file
spilled to disk => "
+ bufferedRecords.getSizeOfFileOnDiskInBytes());
List<WriteStatus> writeStatuses = new ArrayList<>();
+
+ boolean isMorTable =
Option.ofNullable(connectConfigs.getString(HoodieTableConfig.TYPE))
+ .map(t -> t.equals(HoodieTableType.MERGE_ON_READ.name()))
+ .orElse(false);
+
// Write out all records if non-empty
if (!bufferedRecords.isEmpty()) {
- writeStatuses = writeClient.bulkInsertPreppedRecords(
- bufferedRecords.values().stream().collect(Collectors.toList()),
- instantTime, Option.empty());
+ if (isMorTable) {
+ writeStatuses = writeClient.upsertPreppedRecords(
+ new LinkedList<>(bufferedRecords.values()),
+ instantTime);
+ } else {
+ writeStatuses = writeClient.bulkInsertPreppedRecords(
+ new LinkedList<>(bufferedRecords.values()),
+ instantTime, Option.empty());
+ }
}
bufferedRecords.close();
- LOG.info("Flushed hudi records and got writeStatuses: "
- + writeStatuses);
+ LOG.info("Flushed hudi records and got writeStatuses: " + writeStatuses);
return writeStatuses;
} catch (Exception e) {
throw new IOException("Write records failed", e);
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
index ae6b5d1..e5662bd 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
@@ -67,7 +67,7 @@ public class KafkaConnectConfigs extends HoodieConfig {
public static final ConfigProperty<String> COORDINATOR_WRITE_TIMEOUT_SECS =
ConfigProperty
.key("hoodie.kafka.coordinator.write.timeout.secs")
- .defaultValue("60")
+ .defaultValue("300")
.withDocumentation("The timeout after sending an END_COMMIT until when "
+ "the coordinator will wait for the write statuses from all the
partitions"
+ "to ignore the current commit and start a new commit.");
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
index ad40ebc..8039e56 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
@@ -23,12 +23,10 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
@@ -38,7 +36,6 @@ import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -54,19 +51,16 @@ import java.util.Map;
public class KafkaConnectTransactionServices implements
ConnectTransactionServices {
private static final Logger LOG =
LogManager.getLogger(KafkaConnectTransactionServices.class);
- private static final String TABLE_FORMAT = "PARQUET";
private final Option<HoodieTableMetaClient> tableMetaClient;
private final Configuration hadoopConf;
- private final FileSystem fs;
private final String tableBasePath;
private final String tableName;
private final HoodieEngineContext context;
private final HoodieJavaWriteClient<HoodieAvroPayload> javaClient;
- public KafkaConnectTransactionServices(
- KafkaConnectConfigs connectConfigs) throws HoodieException {
+ public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs)
throws HoodieException {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withProperties(connectConfigs.getProps()).build();
@@ -74,29 +68,25 @@ public class KafkaConnectTransactionServices implements
ConnectTransactionServic
tableName = writeConfig.getTableName();
hadoopConf = KafkaConnectUtils.getDefaultHadoopConf();
context = new HoodieJavaEngineContext(hadoopConf);
- fs = FSUtils.getFs(tableBasePath, hadoopConf);
try {
KeyGenerator keyGenerator =
HoodieAvroKeyGeneratorFactory.createKeyGenerator(
new TypedProperties(connectConfigs.getProps()));
-
String recordKeyFields =
KafkaConnectUtils.getRecordKeyColumns(keyGenerator);
String partitionColumns =
KafkaConnectUtils.getPartitionColumns(keyGenerator,
new TypedProperties(connectConfigs.getProps()));
- LOG.info(String.format("Setting record key %s and partitionfields %s for
table %s",
- recordKeyFields,
- partitionColumns,
- tableBasePath + tableName));
+ LOG.info(String.format("Setting record key %s and partition fields %s
for table %s",
+ recordKeyFields, partitionColumns, tableBasePath + tableName));
tableMetaClient = Option.of(HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE.name())
.setTableName(tableName)
.setPayloadClassName(HoodieAvroPayload.class.getName())
- .setBaseFileFormat(TABLE_FORMAT)
.setRecordKeyFields(recordKeyFields)
.setPartitionFields(partitionColumns)
.setKeyGeneratorClassProp(writeConfig.getKeyGeneratorClass())
+ .fromProperties(connectConfigs.getProps())
.initTable(hadoopConf, tableBasePath));
javaClient = new HoodieJavaWriteClient<>(context, writeConfig);
@@ -113,8 +103,7 @@ public class KafkaConnectTransactionServices implements
ConnectTransactionServic
}
public void endCommit(String commitTime, List<WriteStatus> writeStatuses,
Map<String, String> extraMetadata) {
- javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata),
- HoodieActiveTimeline.COMMIT_ACTION, Collections.emptyMap());
+ javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata));
LOG.info("Ending Hudi commit " + commitTime);
}
diff --git
a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
index 3ca64c3..c8a3ad6 100644
---
a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
+++
b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
@@ -148,7 +148,7 @@ public class TestAbstractConnectWriter {
private List<HoodieRecord> writtenRecords;
public AbstractHudiConnectWriterTestWrapper(KafkaConnectConfigs
connectConfigs, KeyGenerator keyGenerator, SchemaProvider schemaProvider) {
- super(connectConfigs, keyGenerator, schemaProvider);
+ super(connectConfigs, keyGenerator, schemaProvider, "000");
writtenRecords = new ArrayList<>();
}
@@ -157,12 +157,12 @@ public class TestAbstractConnectWriter {
}
@Override
- protected void writeHudiRecord(HoodieRecord<HoodieAvroPayload> record) {
+ protected void writeHudiRecord(HoodieRecord<?> record) {
writtenRecords.add(record);
}
@Override
- protected List<WriteStatus> flushHudiRecords() {
+ protected List<WriteStatus> flushRecords() {
return null;
}
}
diff --git
a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java
b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java
index d1813e1..b0dcf38 100644
---
a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java
+++
b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java
@@ -88,7 +88,7 @@ public class TestBufferedConnectWriter {
Mockito.verify(mockHoodieJavaWriteClient, times(0))
.bulkInsertPreppedRecords(anyList(), eq(COMMIT_TIME),
eq(Option.empty()));
- writer.flushHudiRecords();
+ writer.flushRecords();
final ArgumentCaptor<List<HoodieRecord>> actualRecords =
ArgumentCaptor.forClass(List.class);
Mockito.verify(mockHoodieJavaWriteClient, times(1))
.bulkInsertPreppedRecords(actualRecords.capture(), eq(COMMIT_TIME),
eq(Option.empty()));