This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 57d5da6  [HUDI-2330][HUDI-2335] Adding support for merge-on-read 
tables (#3679)
57d5da6 is described below

commit 57d5da68aa646adfd327394daea720cb3d592b1d
Author: vinoth chandar <[email protected]>
AuthorDate: Thu Sep 16 15:24:34 2021 -0700

    [HUDI-2330][HUDI-2335] Adding support for merge-on-read tables (#3679)
    
    - Inserts go into logs, hashed by Kafka and Hudi partitions
     - Fixed issues with the setupKafka script
     - Bumped up the default commit interval to 300 seconds
     - Minor renaming
---
 .../java/org/apache/hudi/client/WriteStatus.java   |   1 +
 .../hudi/table/HoodieJavaMergeOnReadTable.java     |  28 ++++-
 .../org/apache/hudi/table/HoodieJavaTable.java     |   5 +-
 .../BaseJavaDeltaCommitActionExecutor.java         |  35 ++++++
 ...JavaUpsertPreppedDeltaCommitActionExecutor.java | 102 ++++++++++++++++
 hudi-kafka-connect/README.md                       |   8 +-
 hudi-kafka-connect/demo/config-sink.json           |   3 +-
 hudi-kafka-connect/demo/setupKafka.sh              | 131 ++++++++++++---------
 .../connect/KafkaConnectFileIdPrefixProvider.java  |  21 +---
 .../hudi/connect/utils/KafkaConnectUtils.java      |  17 +++
 .../connect/writers/AbstractConnectWriter.java     |  22 +++-
 .../connect/writers/BufferedConnectWriter.java     |  35 ++++--
 .../hudi/connect/writers/KafkaConnectConfigs.java  |   2 +-
 .../writers/KafkaConnectTransactionServices.java   |  21 +---
 .../hudi/writers/TestAbstractConnectWriter.java    |   6 +-
 .../hudi/writers/TestBufferedConnectWriter.java    |   2 +-
 16 files changed, 315 insertions(+), 124 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
index a731384..8f74858 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
@@ -201,6 +201,7 @@ public class WriteStatus implements Serializable {
   public String toString() {
     final StringBuilder sb = new StringBuilder("WriteStatus {");
     sb.append("fileId=").append(fileId);
+    sb.append(", writeStat=").append(stat);
     sb.append(", globalError='").append(globalError).append('\'');
     sb.append(", hasErrors='").append(hasErrors()).append('\'');
     sb.append(", errorCount='").append(totalErrorRecords).append('\'');
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
index 4995af0..a78b71b 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
@@ -18,14 +18,40 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import 
org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
+import 
org.apache.hudi.table.action.deltacommit.JavaUpsertPreppedDeltaCommitActionExecutor;
+
+import java.util.List;
 
 public class HoodieJavaMergeOnReadTable<T extends HoodieRecordPayload> extends 
HoodieJavaCopyOnWriteTable<T> {
   protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, 
HoodieEngineContext context, HoodieTableMetaClient metaClient) {
     super(config, context, metaClient);
   }
-  // TODO not support yet.
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> 
upsertPrepped(HoodieEngineContext context,
+                                                              String 
instantTime,
+                                                              
List<HoodieRecord<T>> preppedRecords) {
+    return new 
JavaUpsertPreppedDeltaCommitActionExecutor<>((HoodieJavaEngineContext) context, 
config,
+        this, instantTime, preppedRecords).execute();
+
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> 
bulkInsertPrepped(HoodieEngineContext context,
+                                                                  String 
instantTime,
+                                                                  
List<HoodieRecord<T>> preppedRecords,
+                                                                  
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+    return new 
JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context, 
config,
+        this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
+  }
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
index 219dec4..9cf9a6d 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -29,9 +29,8 @@ import 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hudi.index.JavaHoodieIndex;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.JavaHoodieIndex;
 
 import java.util.List;
 
@@ -56,7 +55,7 @@ public abstract class HoodieJavaTable<T extends 
HoodieRecordPayload>
       case COPY_ON_WRITE:
         return new HoodieJavaCopyOnWriteTable<>(config, context, metaClient);
       case MERGE_ON_READ:
-        throw new HoodieNotSupportedException("MERGE_ON_READ is not supported 
yet");
+        return new HoodieJavaMergeOnReadTable<>(config, context, metaClient);
       default:
         throw new HoodieException("Unsupported table type :" + 
metaClient.getTableType());
     }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseJavaDeltaCommitActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseJavaDeltaCommitActionExecutor.java
new file mode 100644
index 0000000..0b4a654
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseJavaDeltaCommitActionExecutor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor;
+
+public abstract class BaseJavaDeltaCommitActionExecutor<T extends 
HoodieRecordPayload<T>> extends BaseJavaCommitActionExecutor<T> {
+
+  public BaseJavaDeltaCommitActionExecutor(HoodieEngineContext context, 
HoodieWriteConfig config, HoodieTable table,
+                                           String instantTime, 
WriteOperationType operationType) {
+    super(context, config, table, instantTime, operationType);
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/JavaUpsertPreppedDeltaCommitActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/JavaUpsertPreppedDeltaCommitActionExecutor.java
new file mode 100644
index 0000000..f6faa28
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/JavaUpsertPreppedDeltaCommitActionExecutor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.deltacommit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.io.HoodieAppendHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.commit.JavaBulkInsertHelper;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+public class JavaUpsertPreppedDeltaCommitActionExecutor<T extends 
HoodieRecordPayload<T>> extends BaseJavaDeltaCommitActionExecutor<T> {
+
+  private static final Logger LOG = 
LogManager.getLogger(JavaUpsertPreppedDeltaCommitActionExecutor.class);
+
+  private final List<HoodieRecord<T>> preppedInputRecords;
+
+  public JavaUpsertPreppedDeltaCommitActionExecutor(HoodieJavaEngineContext 
context, HoodieWriteConfig config, HoodieTable table,
+                                                    String instantTime, 
List<HoodieRecord<T>> preppedInputRecords) {
+    super(context, config, table, instantTime, 
WriteOperationType.UPSERT_PREPPED);
+    this.preppedInputRecords = preppedInputRecords;
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    HoodieWriteMetadata<List<WriteStatus>> result = new 
HoodieWriteMetadata<>();
+    // First group by target file id.
+    HashMap<Pair<String, String>, List<HoodieRecord<T>>> recordsByFileId = new 
HashMap<>();
+    List<HoodieRecord<T>> insertedRecords = new LinkedList<>();
+
+    // Split records into inserts and updates.
+    for (HoodieRecord<T> record : preppedInputRecords) {
+      if (!record.isCurrentLocationKnown()) {
+        insertedRecords.add(record);
+      } else {
+        Pair<String, String> fileIdPartitionPath = 
Pair.of(record.getCurrentLocation().getFileId(), record.getPartitionPath());
+        if (!recordsByFileId.containsKey(fileIdPartitionPath)) {
+          recordsByFileId.put(fileIdPartitionPath, new LinkedList<>());
+        }
+        recordsByFileId.get(fileIdPartitionPath).add(record);
+      }
+    }
+    LOG.info(String.format("Total update fileIDs %s, total inserts %s for 
commit %s",
+        recordsByFileId.size(), insertedRecords.size(), instantTime));
+
+    List<WriteStatus> allWriteStatuses = new ArrayList<>();
+    try {
+      recordsByFileId.forEach((k, v) -> {
+        HoodieAppendHandle<?, ?, ?, ?> appendHandle = new 
HoodieAppendHandle(config, instantTime, table,
+            k.getRight(), k.getLeft(), v.iterator(), taskContextSupplier);
+        appendHandle.doAppend();
+        allWriteStatuses.addAll(appendHandle.close());
+      });
+
+      if (insertedRecords.size() > 0) {
+        HoodieWriteMetadata<List<WriteStatus>> insertResult = 
JavaBulkInsertHelper.newInstance()
+            .bulkInsert(insertedRecords, instantTime, table, config, this, 
false, Option.empty());
+        allWriteStatuses.addAll(insertResult.getWriteStatuses());
+      }
+    } catch (Throwable e) {
+      if (e instanceof HoodieUpsertException) {
+        throw e;
+      }
+      throw new HoodieUpsertException("Failed to upsert for commit time " + 
instantTime, e);
+    }
+
+    updateIndex(allWriteStatuses, result);
+    return result;
+  }
+}
diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md
index 85fc009..584fddf 100644
--- a/hudi-kafka-connect/README.md
+++ b/hudi-kafka-connect/README.md
@@ -70,9 +70,9 @@ Wait until the kafka cluster is up and running.
 
 ### 2 - Set up the schema registry
 
-Hudi leverages schema registry to obtain the latest schema when writing 
records. While it supports most popular schema registries, 
-we use Confluent schema registry. Download the latest confluent schema 
registry code from https://github.com/confluentinc/schema-registry
-and start the schema registry service.
+Hudi leverages schema registry to obtain the latest schema when writing 
records. While it supports most popular schema
+registries, we use Confluent schema registry. Download the latest confluent 
platform and run the schema registry
+service.
 
 ```bash
 cd $CONFLUENT_DIR
@@ -120,7 +120,7 @@ that can be changed based on the desired properties.
 
 ```bash
 curl -X DELETE http://localhost:8083/connectors/hudi-sink
-curl -X POST -H "Content-Type:application/json" -d 
@$HUDI-DIR/hudi-kafka-connect/demo/config-sink.json 
http://localhost:8083/connectors
+curl -X POST -H "Content-Type:application/json" -d 
@${HUDI_DIR}/hudi-kafka-connect/demo/config-sink.json 
http://localhost:8083/connectors
 ```
 
 Now, you should see that the connector is created and tasks are running.
diff --git a/hudi-kafka-connect/demo/config-sink.json 
b/hudi-kafka-connect/demo/config-sink.json
index 75e6d84..2d2be00 100644
--- a/hudi-kafka-connect/demo/config-sink.json
+++ b/hudi-kafka-connect/demo/config-sink.json
@@ -9,10 +9,11 @@
                "value.converter.schemas.enable": "false",
                "topics": "hudi-test-topic",
                "hoodie.table.name": "hudi-test-topic",
+               "hoodie.table.type": "MERGE_ON_READ",
                "hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic",
                "hoodie.datasource.write.recordkey.field": "volume",
                "hoodie.datasource.write.partitionpath.field": "date",
                "hoodie.schemaprovider.class": 
"org.apache.hudi.schema.SchemaRegistryProvider",
                "hoodie.deltastreamer.schemaprovider.registry.url": 
"http://localhost:8081/subjects/hudi-test-topic/versions/latest";
-    }
+       }
 }
diff --git a/hudi-kafka-connect/demo/setupKafka.sh 
b/hudi-kafka-connect/demo/setupKafka.sh
old mode 100644
new mode 100755
index f2c1735..81968a4
--- a/hudi-kafka-connect/demo/setupKafka.sh
+++ b/hudi-kafka-connect/demo/setupKafka.sh
@@ -16,38 +16,33 @@
 
 #!/bin/bash
 
-## Directories
-HOME_DIR=~
-HUDI_DIR=${HOME_DIR}/hudi
-KAFKA_HOME=${HOME_DIR}/kafka
-
 #########################
 # The command line help #
 #########################
 usage() {
-    echo "Usage: $0"
-    echo "   -n |--num-kafka-records, (required) number of kafka records to 
generate"
-    echo "   -f |--raw-file, (optional) raw file for the kafka records"
-    echo "   -k |--kafka-topic, (optional) Topic name for Kafka"
-    echo "   -m |--num-kafka-partitions, (optional) number of kafka partitions"
-    echo "   -r |--record-key, (optional) field to use as record key"
-    echo "   -l |--num-hudi-partitions, (optional) number of hudi partitions"
-    echo "   -p |--partition-key, (optional) field to use as partition"
-    echo "   -s |--schema-file, (optional) path of the file containing the 
schema of the records"
-    exit 1
+  echo "Usage: $0"
+  echo "   -n |--num-kafka-records, (required) number of kafka records to 
generate"
+  echo "   -f |--raw-file, (optional) raw file for the kafka records"
+  echo "   -k |--kafka-topic, (optional) Topic name for Kafka"
+  echo "   -m |--num-kafka-partitions, (optional) number of kafka partitions"
+  echo "   -r |--record-key, (optional) field to use as record key"
+  echo "   -l |--num-hudi-partitions, (optional) number of hudi partitions"
+  echo "   -p |--partition-key, (optional) field to use as partition"
+  echo "   -s |--schema-file, (optional) path of the file containing the 
schema of the records"
+  exit 1
 }
 
 case "$1" in
-   --help)
-       usage
-       exit 0
-       ;;
+--help)
+  usage
+  exit 0
+  ;;
 esac
 
 if [ $# -lt 1 ]; then
-    echo "Illegal number of parameters"
-    usage
-    exit 0
+  echo "Illegal number of parameters"
+  usage
+  exit 0
 fi
 
 ## defaults
@@ -61,71 +56,91 @@ schemaFile=${HUDI_DIR}/docker/demo/config/schema.avsc
 
 while getopts ":n:f:k:m:r:l:p:s:-:" opt; do
   case $opt in
-    n) num_records="$OPTARG"
+  n)
+    num_records="$OPTARG"
     printf "Argument num-kafka-records is %s\n" "$num_records"
     ;;
-    k) rawDataFile="$OPTARG"
+  k)
+    rawDataFile="$OPTARG"
     printf "Argument raw-file is %s\n" "$rawDataFile"
     ;;
-    f) kafkaTopicName="$OPTARG"
+  f)
+    kafkaTopicName="$OPTARG"
     printf "Argument kafka-topic is %s\n" "$kafkaTopicName"
     ;;
-    m) numKafkaPartitions="$OPTARG"
+  m)
+    numKafkaPartitions="$OPTARG"
     printf "Argument num-kafka-partitions is %s\n" "$numKafkaPartitions"
     ;;
-    r) recordKey="$OPTARG"
+  r)
+    recordKey="$OPTARG"
     printf "Argument record-key is %s\n" "$recordKey"
     ;;
-    l) numHudiPartitions="$OPTARG"
+  l)
+    numHudiPartitions="$OPTARG"
     printf "Argument num-hudi-partitions is %s\n" "$numHudiPartitions"
     ;;
-    p) partitionField="$OPTARG"
+  p)
+    partitionField="$OPTARG"
     printf "Argument partition-key is %s\n" "$partitionField"
     ;;
-    p) schemaFile="$OPTARG"
+  p)
+    schemaFile="$OPTARG"
     printf "Argument schema-file is %s\n" "$schemaFile"
     ;;
-    -) echo "Invalid option -$OPTARG" >&2
+  -)
+    echo "Invalid option -$OPTARG" >&2
     ;;
-esac
+  esac
 done
 
 # First delete the existing topic
-$KAFKA_HOME/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} 
--bootstrap-server localhost:9092
+#${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} 
--bootstrap-server localhost:9092
 
 # Create the topic with 4 partitions
-$KAFKA_HOME/bin/kafka-topics.sh --create --topic ${kafkaTopicName} 
--partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server 
localhost:9092
-
+#${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${kafkaTopicName} 
--partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server 
localhost:9092
 
 # Setup the schema registry
-export SCHEMA=`sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed 
'/\/\*/,/*\//d' | jq tostring`
+export SCHEMA=$(sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed 
'/\/\*/,/*\//d' | jq tostring)
 curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data 
"{\"schema\": $SCHEMA}" 
http://localhost:8081/subjects/${kafkaTopicName}/versions
 curl -X GET http://localhost:8081/subjects/${kafkaTopicName}/versions/latest
 
-
 # Generate kafka messages from raw records
 # Each records with unique keys and generate equal messages across each hudi 
partition
 partitions={}
-for ((i=0; i<${numHudiPartitions}; i++))
-do
-    partitions[$i]="partition-"$i;
+for ((i = 0; i < ${numHudiPartitions}; i++)); do
+  partitions[$i]="partition-"$i
+done
+
+events_file=/tmp/kcat-input.events
+rm -f ${events_file}
+
+recordValue=0
+num_records=$((num_records + 0))
+
+for (( ; ; )); do
+  while IFS= read line; do
+    for partitionValue in "${partitions[@]}"; do
+      echo $line | jq --arg recordKey $recordKey --arg recordValue 
$recordValue --arg partitionField $partitionField --arg partitionValue 
$partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] = 
$partitionValue' >>${events_file}
+      ((recordValue = recordValue + 1))
+
+      if [ $recordValue -gt $num_records ]; then
+        break
+      fi
+    done
+
+    if [ $recordValue -gt $num_records ]; then
+      break
+    fi
+
+    if [ $(($recordValue % 1000)) -eq 0 ]; then
+      sleep 1
+    fi
+  done <"$rawDataFile"
+
+  if [ $recordValue -gt $num_records ]; then
+    break
+  fi
 done
 
-for ((recordValue=0; recordValue<=${num_records}; ))
-do 
-    while IFS= read line 
-    do
-        for partitionValue in "${partitions[@]}"
-        do
-            echo $line | jq --arg recordKey $recordKey --arg recordValue 
$recordValue --arg partitionField $partitionField --arg partitionValue 
$partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] = 
$partitionValue' | kafkacat -P -b localhost:9092 -t hudi-test-topic;
-            ((recordValue++));
-            if [ $recordValue -gt ${num_records} ]; then
-                exit 0
-            fi
-        done
-        
-        if [ $(( $recordValue % 1000 )) -eq 0 ]
-            then sleep 1
-        fi
-    done < "$rawDataFile"
-done 
+grep -v '^$' ${events_file} | kcat -P -b localhost:9092 -t hudi-test-topic
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
index 536ad4a..9c46747 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
@@ -18,17 +18,13 @@
 
 package org.apache.hudi.connect;
 
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.connect.utils.KafkaConnectUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.FileIdPrefixProvider;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.nio.charset.StandardCharsets;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Objects;
 import java.util.Properties;
 
 public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider {
@@ -52,18 +48,9 @@ public class KafkaConnectFileIdPrefixProvider extends 
FileIdPrefixProvider {
     // We use a combination of kafka partition and partition path as the file 
id, and then hash it
     // to generate a fixed sized hash.
     String rawFileIdPrefix = kafkaPartition + partitionPath;
-    MessageDigest md;
-    try {
-      md = MessageDigest.getInstance("MD5");
-    } catch (NoSuchAlgorithmException e) {
-      LOG.error("Fatal error selecting hash algorithm", e);
-      throw new HoodieException(e);
-    }
-
-    byte[] digest = 
Objects.requireNonNull(md).digest(rawFileIdPrefix.getBytes(StandardCharsets.UTF_8));
-
+    String hashedPrefix = KafkaConnectUtils.hashDigest(rawFileIdPrefix);
     LOG.info("CreateFileId for Kafka Partition " + kafkaPartition + " : " + 
partitionPath + " = " + rawFileIdPrefix
-        + " === " + StringUtils.toHexString(digest).toUpperCase());
-    return StringUtils.toHexString(digest).toUpperCase();
+        + " === " + hashedPrefix);
+    return hashedPrefix;
   }
 }
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
index 593cfb1..34a44c8 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
@@ -26,6 +26,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.CustomAvroKeyGenerator;
@@ -41,8 +42,12 @@ import org.apache.kafka.common.KafkaFuture;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
@@ -137,4 +142,16 @@ public class KafkaConnectUtils {
       return Option.empty();
     }
   }
+  
+  public static String hashDigest(String stringToHash) {
+    MessageDigest md;
+    try {
+      md = MessageDigest.getInstance("MD5");
+    } catch (NoSuchAlgorithmException e) {
+      LOG.error("Fatal error selecting hash algorithm", e);
+      throw new HoodieException(e);
+    }
+    byte[] digest = 
Objects.requireNonNull(md).digest(stringToHash.getBytes(StandardCharsets.UTF_8));
+    return StringUtils.toHexString(digest).toUpperCase();
+  }
 }
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
index 3d8e5f8..9888fd1 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
@@ -21,7 +21,9 @@ package org.apache.hudi.connect.writers;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.connect.utils.KafkaConnectUtils;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
@@ -46,17 +48,19 @@ public abstract class AbstractConnectWriter implements 
ConnectWriter<WriteStatus
   public static final String KAFKA_JSON_CONVERTER = 
"org.apache.kafka.connect.json.JsonConverter";
   public static final String KAFKA_STRING_CONVERTER = 
"org.apache.kafka.connect.storage.StringConverter";
   private static final Logger LOG = 
LogManager.getLogger(AbstractConnectWriter.class);
+  protected final String instantTime;
 
-  private final KafkaConnectConfigs connectConfigs;
   private final KeyGenerator keyGenerator;
   private final SchemaProvider schemaProvider;
+  protected final KafkaConnectConfigs connectConfigs;
 
   public AbstractConnectWriter(KafkaConnectConfigs connectConfigs,
                                KeyGenerator keyGenerator,
-                               SchemaProvider schemaProvider) {
+                               SchemaProvider schemaProvider, String 
instantTime) {
     this.connectConfigs = connectConfigs;
     this.keyGenerator = keyGenerator;
     this.schemaProvider = schemaProvider;
+    this.instantTime = instantTime;
   }
 
   @Override
@@ -76,16 +80,22 @@ public abstract class AbstractConnectWriter implements 
ConnectWriter<WriteStatus
         throw new IOException("Unsupported Kafka Format type (" + 
connectConfigs.getKafkaValueConverter() + ")");
     }
 
-    HoodieRecord hoodieRecord = new 
HoodieRecord<>(keyGenerator.getKey(avroRecord.get()), new 
HoodieAvroPayload(avroRecord));
+    // Tag records with a file ID based on kafka partition and hudi partition.
+    HoodieRecord<?> hoodieRecord = new 
HoodieRecord<>(keyGenerator.getKey(avroRecord.get()), new 
HoodieAvroPayload(avroRecord));
+    String fileId = KafkaConnectUtils.hashDigest(String.format("%s-%s", 
record.kafkaPartition(), hoodieRecord.getPartitionPath()));
+    hoodieRecord.unseal();
+    hoodieRecord.setCurrentLocation(new HoodieRecordLocation(instantTime, 
fileId));
+    hoodieRecord.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
+    hoodieRecord.seal();
     writeHudiRecord(hoodieRecord);
   }
 
   @Override
   public List<WriteStatus> close() throws IOException {
-    return flushHudiRecords();
+    return flushRecords();
   }
 
-  protected abstract void writeHudiRecord(HoodieRecord<HoodieAvroPayload> 
record);
+  protected abstract void writeHudiRecord(HoodieRecord<?> record);
 
-  protected abstract List<WriteStatus> flushHudiRecords() throws IOException;
+  protected abstract List<WriteStatus> flushRecords() throws IOException;
 }
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
index a60293d..0449f07 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
@@ -21,8 +21,9 @@ package org.apache.hudi.connect.writers;
 import org.apache.hudi.client.HoodieJavaWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
@@ -39,8 +40,8 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * Specific implementation of a Hudi Writer that buffers all incoming records,
@@ -52,9 +53,8 @@ public class BufferedConnectWriter extends 
AbstractConnectWriter {
 
   private final HoodieEngineContext context;
   private final HoodieJavaWriteClient writeClient;
-  private final String instantTime;
   private final HoodieWriteConfig config;
-  private ExternalSpillableMap<String, HoodieRecord<HoodieAvroPayload>> 
bufferedRecords;
+  private ExternalSpillableMap<String, HoodieRecord<?>> bufferedRecords;
 
   public BufferedConnectWriter(HoodieEngineContext context,
                                HoodieJavaWriteClient writeClient,
@@ -63,10 +63,9 @@ public class BufferedConnectWriter extends 
AbstractConnectWriter {
                                HoodieWriteConfig config,
                                KeyGenerator keyGenerator,
                                SchemaProvider schemaProvider) {
-    super(connectConfigs, keyGenerator, schemaProvider);
+    super(connectConfigs, keyGenerator, schemaProvider, instantTime);
     this.context = context;
     this.writeClient = writeClient;
-    this.instantTime = instantTime;
     this.config = config;
     init();
   }
@@ -88,12 +87,12 @@ public class BufferedConnectWriter extends 
AbstractConnectWriter {
   }
 
   @Override
-  public void writeHudiRecord(HoodieRecord<HoodieAvroPayload> record) {
+  public void writeHudiRecord(HoodieRecord<?> record) {
     bufferedRecords.put(record.getRecordKey(), record);
   }
 
   @Override
-  public List<WriteStatus> flushHudiRecords() throws IOException {
+  public List<WriteStatus> flushRecords() throws IOException {
     try {
       LOG.info("Number of entries in MemoryBasedMap => "
           + bufferedRecords.getInMemoryMapNumEntries()
@@ -102,15 +101,25 @@ public class BufferedConnectWriter extends 
AbstractConnectWriter {
           + bufferedRecords.getDiskBasedMapNumEntries() + "Size of file 
spilled to disk => "
           + bufferedRecords.getSizeOfFileOnDiskInBytes());
       List<WriteStatus> writeStatuses = new ArrayList<>();
+
+      boolean isMorTable = 
Option.ofNullable(connectConfigs.getString(HoodieTableConfig.TYPE))
+          .map(t -> t.equals(HoodieTableType.MERGE_ON_READ.name()))
+          .orElse(false);
+
       // Write out all records if non-empty
       if (!bufferedRecords.isEmpty()) {
-        writeStatuses = writeClient.bulkInsertPreppedRecords(
-          bufferedRecords.values().stream().collect(Collectors.toList()),
-        instantTime, Option.empty());
+        if (isMorTable) {
+          writeStatuses = writeClient.upsertPreppedRecords(
+              new LinkedList<>(bufferedRecords.values()),
+              instantTime);
+        } else {
+          writeStatuses = writeClient.bulkInsertPreppedRecords(
+              new LinkedList<>(bufferedRecords.values()),
+              instantTime, Option.empty());
+        }
       }
       bufferedRecords.close();
-      LOG.info("Flushed hudi records and got writeStatuses: "
-          + writeStatuses);
+      LOG.info("Flushed hudi records and got writeStatuses: " + writeStatuses);
       return writeStatuses;
     } catch (Exception e) {
       throw new IOException("Write records failed", e);
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
index ae6b5d1..e5662bd 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
@@ -67,7 +67,7 @@ public class KafkaConnectConfigs extends HoodieConfig {
 
   public static final ConfigProperty<String> COORDINATOR_WRITE_TIMEOUT_SECS = 
ConfigProperty
       .key("hoodie.kafka.coordinator.write.timeout.secs")
-      .defaultValue("60")
+      .defaultValue("300")
       .withDocumentation("The timeout after sending an END_COMMIT until when "
           + "the coordinator will wait for the write statuses from all the 
partitions"
           + "to ignore the current commit and start a new commit.");
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
index ad40ebc..8039e56 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
@@ -23,12 +23,10 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.connect.transaction.TransactionCoordinator;
@@ -38,7 +36,6 @@ import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -54,19 +51,16 @@ import java.util.Map;
 public class KafkaConnectTransactionServices implements 
ConnectTransactionServices {
 
   private static final Logger LOG = 
LogManager.getLogger(KafkaConnectTransactionServices.class);
-  private static final String TABLE_FORMAT = "PARQUET";
 
   private final Option<HoodieTableMetaClient> tableMetaClient;
   private final Configuration hadoopConf;
-  private final FileSystem fs;
   private final String tableBasePath;
   private final String tableName;
   private final HoodieEngineContext context;
 
   private final HoodieJavaWriteClient<HoodieAvroPayload> javaClient;
 
-  public KafkaConnectTransactionServices(
-      KafkaConnectConfigs connectConfigs) throws HoodieException {
+  public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) 
throws HoodieException {
     HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
         .withProperties(connectConfigs.getProps()).build();
 
@@ -74,29 +68,25 @@ public class KafkaConnectTransactionServices implements 
ConnectTransactionServic
     tableName = writeConfig.getTableName();
     hadoopConf = KafkaConnectUtils.getDefaultHadoopConf();
     context = new HoodieJavaEngineContext(hadoopConf);
-    fs = FSUtils.getFs(tableBasePath, hadoopConf);
 
     try {
       KeyGenerator keyGenerator = 
HoodieAvroKeyGeneratorFactory.createKeyGenerator(
           new TypedProperties(connectConfigs.getProps()));
-
       String recordKeyFields = 
KafkaConnectUtils.getRecordKeyColumns(keyGenerator);
       String partitionColumns = 
KafkaConnectUtils.getPartitionColumns(keyGenerator,
           new TypedProperties(connectConfigs.getProps()));
 
-      LOG.info(String.format("Setting record key %s and partitionfields %s for 
table %s",
-          recordKeyFields,
-          partitionColumns,
-          tableBasePath + tableName));
+      LOG.info(String.format("Setting record key %s and partition fields %s 
for table %s",
+          recordKeyFields, partitionColumns, tableBasePath + tableName));
 
       tableMetaClient = Option.of(HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(HoodieTableType.COPY_ON_WRITE.name())
           .setTableName(tableName)
           .setPayloadClassName(HoodieAvroPayload.class.getName())
-          .setBaseFileFormat(TABLE_FORMAT)
           .setRecordKeyFields(recordKeyFields)
           .setPartitionFields(partitionColumns)
           .setKeyGeneratorClassProp(writeConfig.getKeyGeneratorClass())
+          .fromProperties(connectConfigs.getProps())
           .initTable(hadoopConf, tableBasePath));
 
       javaClient = new HoodieJavaWriteClient<>(context, writeConfig);
@@ -113,8 +103,7 @@ public class KafkaConnectTransactionServices implements 
ConnectTransactionServic
   }
 
   public void endCommit(String commitTime, List<WriteStatus> writeStatuses, 
Map<String, String> extraMetadata) {
-    javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata),
-        HoodieActiveTimeline.COMMIT_ACTION, Collections.emptyMap());
+    javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata));
     LOG.info("Ending Hudi commit " + commitTime);
   }
 
diff --git 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
index 3ca64c3..c8a3ad6 100644
--- 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
+++ 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
@@ -148,7 +148,7 @@ public class TestAbstractConnectWriter {
     private List<HoodieRecord> writtenRecords;
 
     public AbstractHudiConnectWriterTestWrapper(KafkaConnectConfigs 
connectConfigs, KeyGenerator keyGenerator, SchemaProvider schemaProvider) {
-      super(connectConfigs, keyGenerator, schemaProvider);
+      super(connectConfigs, keyGenerator, schemaProvider, "000");
       writtenRecords = new ArrayList<>();
     }
 
@@ -157,12 +157,12 @@ public class TestAbstractConnectWriter {
     }
 
     @Override
-    protected void writeHudiRecord(HoodieRecord<HoodieAvroPayload> record) {
+    protected void writeHudiRecord(HoodieRecord<?> record) {
       writtenRecords.add(record);
     }
 
     @Override
-    protected List<WriteStatus> flushHudiRecords() {
+    protected List<WriteStatus> flushRecords() {
       return null;
     }
   }
diff --git 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java
 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java
index d1813e1..b0dcf38 100644
--- 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java
+++ 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestBufferedConnectWriter.java
@@ -88,7 +88,7 @@ public class TestBufferedConnectWriter {
     Mockito.verify(mockHoodieJavaWriteClient, times(0))
         .bulkInsertPreppedRecords(anyList(), eq(COMMIT_TIME), 
eq(Option.empty()));
 
-    writer.flushHudiRecords();
+    writer.flushRecords();
     final ArgumentCaptor<List<HoodieRecord>> actualRecords = 
ArgumentCaptor.forClass(List.class);
     Mockito.verify(mockHoodieJavaWriteClient, times(1))
         .bulkInsertPreppedRecords(actualRecords.capture(), eq(COMMIT_TIME), 
eq(Option.empty()));

Reply via email to