yanghua commented on a change in pull request #2176:
URL: https://github.com/apache/hudi/pull/2176#discussion_r519572985



##########
File path: 
hudi-flink-writer/src/main/java/org/apache/hudi/source/KafkaJson2HoodieRecord.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.hudi.HudiFlinkStreamer;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.schema.FilebasedSchemaProvider;
+import org.apache.hudi.util.AvroConvertor;
+import org.apache.hudi.util.StreamerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class KafkaJson2HoodieRecord implements MapFunction<String, 
HoodieRecord> {
+
+  private static Logger LOG = 
LoggerFactory.getLogger(KafkaJson2HoodieRecord.class);
+
+  private final HudiFlinkStreamer.Config cfg;
+  private TypedProperties props;
+  private KeyGenerator keyGenerator;
+  private AvroConvertor convertor;

Review comment:
       Add a new empty line.

##########
File path: 
hudi-flink-writer/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.HudiFlinkStreamer;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieFlinkStreamerException;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, 
HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> implements 
CheckpointedFunction {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(KeyedWriteProcessFunction.class);
+  private List<HoodieRecord> records = new LinkedList<>();
+  private Collector<Tuple3<String, List<WriteStatus>, Integer>> output;
+  private int indexOfThisSubtask;
+  private String latestInstant;
+  private boolean hasRecordsIn;
+
+  /**
+   * Job conf.
+   */
+  private HudiFlinkStreamer.Config cfg;
+  /**
+   * Serializable hadoop conf.
+   */
+  private SerializableConfiguration serializableHadoopConf;
+  /**
+   * HoodieWriteConfig.
+   */
+  private HoodieWriteConfig writeConfig;

Review comment:
       Can be a local variable.

##########
File path: 
hudi-flink-writer/src/main/java/org/apache/hudi/source/KafkaJson2HoodieRecord.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.hudi.HudiFlinkStreamer;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.schema.FilebasedSchemaProvider;
+import org.apache.hudi.util.AvroConvertor;
+import org.apache.hudi.util.StreamerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class KafkaJson2HoodieRecord implements MapFunction<String, 
HoodieRecord> {

Review comment:
       It would be better to add `xxxFunction` as the suffix. wdyt?

##########
File path: 
hudi-flink-writer/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.HudiFlinkStreamer;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieFlinkStreamerException;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, 
HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> implements 
CheckpointedFunction {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(KeyedWriteProcessFunction.class);
+  private List<HoodieRecord> records = new LinkedList<>();
+  private Collector<Tuple3<String, List<WriteStatus>, Integer>> output;
+  private int indexOfThisSubtask;
+  private String latestInstant;
+  private boolean hasRecordsIn;
+
+  /**
+   * Job conf.
+   */
+  private HudiFlinkStreamer.Config cfg;
+  /**
+   * Serializable hadoop conf.
+   */
+  private SerializableConfiguration serializableHadoopConf;
+  /**
+   * HoodieWriteConfig.
+   */
+  private HoodieWriteConfig writeConfig;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+
+    indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+
+    cfg = (HudiFlinkStreamer.Config) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+
+    // hadoopConf
+    serializableHadoopConf = new SerializableConfiguration(new 
org.apache.hadoop.conf.Configuration());
+    // HoodieWriteConfig
+    writeConfig = StreamerUtil.getHoodieClientConfig(cfg);
+
+    HoodieFlinkEngineContext context = new 
HoodieFlinkEngineContext(serializableHadoopConf, new 
FlinkTaskContextSupplier(getRuntimeContext()));
+
+    // writeClient
+    writeClient = new HoodieFlinkWriteClient<>(context, writeConfig);
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    // get latest requested instant
+    String commitType = 
cfg.tableType.equals(HoodieTableType.COPY_ON_WRITE.name()) ? 
HoodieTimeline.COMMIT_ACTION : HoodieTimeline.DELTA_COMMIT_ACTION;
+    List<String> latestInstants = 
writeClient.getInflightsAndRequestedInstants(commitType);
+    latestInstant = latestInstants.isEmpty() ? null : latestInstants.get(0);
+
+    if (output != null && latestInstant != null && records.size() > 0) {
+      hasRecordsIn = true;
+      String instantTimestamp = latestInstant;
+      LOG.info("Upsert records, subtask id = [{}]  checkpoint_id = [{}}] 
instant = [{}], record size = [{}]", indexOfThisSubtask, 
context.getCheckpointId(), instantTimestamp, records.size());
+
+      List<WriteStatus> writeStatus;
+      switch (cfg.operation) {
+        case INSERT:
+          writeStatus = writeClient.insert(records, instantTimestamp);
+          break;
+        case UPSERT:
+          writeStatus = writeClient.upsert(records, instantTimestamp);
+          break;
+        default:
+          throw new HoodieFlinkStreamerException("Unknown operation : " + 
cfg.operation);
+      }
+      output.collect(new Tuple3<>(instantTimestamp, writeStatus, 
indexOfThisSubtask));
+      records.clear();
+    }
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext 
functionInitializationContext) throws Exception {
+    // no operation
+  }
+
+  @Override
+  public void processElement(HoodieRecord hoodieRecord, Context context, 
Collector<Tuple3<String, List<WriteStatus>, Integer>> collector) throws 
Exception {
+    records.add(hoodieRecord);

Review comment:
       I am thinking one thing: if the interval of the checkpoint is too long. 
If this buffer would cause OOM?

##########
File path: 
hudi-flink-writer/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.HudiFlinkStreamer;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieFlinkStreamerException;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, 
HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> implements 
CheckpointedFunction {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(KeyedWriteProcessFunction.class);
+  private List<HoodieRecord> records = new LinkedList<>();
+  private Collector<Tuple3<String, List<WriteStatus>, Integer>> output;
+  private int indexOfThisSubtask;
+  private String latestInstant;
+  private boolean hasRecordsIn;
+
+  /**
+   * Job conf.
+   */
+  private HudiFlinkStreamer.Config cfg;
+  /**
+   * Serializable hadoop conf.
+   */
+  private SerializableConfiguration serializableHadoopConf;
+  /**
+   * HoodieWriteConfig.
+   */
+  private HoodieWriteConfig writeConfig;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+
+    indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+
+    cfg = (HudiFlinkStreamer.Config) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+
+    // hadoopConf
+    serializableHadoopConf = new SerializableConfiguration(new 
org.apache.hadoop.conf.Configuration());
+    // HoodieWriteConfig
+    writeConfig = StreamerUtil.getHoodieClientConfig(cfg);
+
+    HoodieFlinkEngineContext context = new 
HoodieFlinkEngineContext(serializableHadoopConf, new 
FlinkTaskContextSupplier(getRuntimeContext()));
+
+    // writeClient
+    writeClient = new HoodieFlinkWriteClient<>(context, writeConfig);
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    // get latest requested instant
+    String commitType = 
cfg.tableType.equals(HoodieTableType.COPY_ON_WRITE.name()) ? 
HoodieTimeline.COMMIT_ACTION : HoodieTimeline.DELTA_COMMIT_ACTION;
+    List<String> latestInstants = 
writeClient.getInflightsAndRequestedInstants(commitType);
+    latestInstant = latestInstants.isEmpty() ? null : latestInstants.get(0);
+
+    if (output != null && latestInstant != null && records.size() > 0) {
+      hasRecordsIn = true;
+      String instantTimestamp = latestInstant;
+      LOG.info("Upsert records, subtask id = [{}]  checkpoint_id = [{}}] 
instant = [{}], record size = [{}]", indexOfThisSubtask, 
context.getCheckpointId(), instantTimestamp, records.size());
+
+      List<WriteStatus> writeStatus;
+      switch (cfg.operation) {
+        case INSERT:
+          writeStatus = writeClient.insert(records, instantTimestamp);
+          break;
+        case UPSERT:
+          writeStatus = writeClient.upsert(records, instantTimestamp);
+          break;
+        default:
+          throw new HoodieFlinkStreamerException("Unknown operation : " + 
cfg.operation);
+      }
+      output.collect(new Tuple3<>(instantTimestamp, writeStatus, 
indexOfThisSubtask));
+      records.clear();
+    }
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext 
functionInitializationContext) throws Exception {
+    // no operation
+  }
+
+  @Override
+  public void processElement(HoodieRecord hoodieRecord, Context context, 
Collector<Tuple3<String, List<WriteStatus>, Integer>> collector) throws 
Exception {

Review comment:
       never used Exception.

##########
File path: 
hudi-flink-writer/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.HudiFlinkStreamer;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieFlinkStreamerException;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, 
HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> implements 
CheckpointedFunction {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(KeyedWriteProcessFunction.class);
+  private List<HoodieRecord> records = new LinkedList<>();
+  private Collector<Tuple3<String, List<WriteStatus>, Integer>> output;
+  private int indexOfThisSubtask;
+  private String latestInstant;
+  private boolean hasRecordsIn;
+
+  /**
+   * Job conf.
+   */
+  private HudiFlinkStreamer.Config cfg;
+  /**
+   * Serializable hadoop conf.
+   */
+  private SerializableConfiguration serializableHadoopConf;
+  /**
+   * HoodieWriteConfig.
+   */
+  private HoodieWriteConfig writeConfig;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+
+    indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+
+    cfg = (HudiFlinkStreamer.Config) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+
+    // hadoopConf
+    serializableHadoopConf = new SerializableConfiguration(new 
org.apache.hadoop.conf.Configuration());
+    // HoodieWriteConfig
+    writeConfig = StreamerUtil.getHoodieClientConfig(cfg);
+
+    HoodieFlinkEngineContext context = new 
HoodieFlinkEngineContext(serializableHadoopConf, new 
FlinkTaskContextSupplier(getRuntimeContext()));
+
+    // writeClient
+    writeClient = new HoodieFlinkWriteClient<>(context, writeConfig);
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    // get latest requested instant
+    String commitType = 
cfg.tableType.equals(HoodieTableType.COPY_ON_WRITE.name()) ? 
HoodieTimeline.COMMIT_ACTION : HoodieTimeline.DELTA_COMMIT_ACTION;
+    List<String> latestInstants = 
writeClient.getInflightsAndRequestedInstants(commitType);
+    latestInstant = latestInstants.isEmpty() ? null : latestInstants.get(0);
+
+    if (output != null && latestInstant != null && records.size() > 0) {
+      hasRecordsIn = true;
+      String instantTimestamp = latestInstant;
+      LOG.info("Upsert records, subtask id = [{}]  checkpoint_id = [{}}] 
instant = [{}], record size = [{}]", indexOfThisSubtask, 
context.getCheckpointId(), instantTimestamp, records.size());
+
+      List<WriteStatus> writeStatus;
+      switch (cfg.operation) {
+        case INSERT:
+          writeStatus = writeClient.insert(records, instantTimestamp);
+          break;
+        case UPSERT:
+          writeStatus = writeClient.upsert(records, instantTimestamp);
+          break;
+        default:
+          throw new HoodieFlinkStreamerException("Unknown operation : " + 
cfg.operation);
+      }
+      output.collect(new Tuple3<>(instantTimestamp, writeStatus, 
indexOfThisSubtask));
+      records.clear();
+    }
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext 
functionInitializationContext) throws Exception {

Review comment:
       never used exception

##########
File path: 
hudi-flink-writer/src/main/java/org/apache/hudi/operator/WriteProcessOperator.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class WriteProcessOperator extends KeyedProcessOperator<String, 
HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> {
+  public static final String NAME = "WriteProcessOperator";
+  private static final Logger LOG = 
LoggerFactory.getLogger(WriteProcessOperator.class);
+  private KeyedWriteProcessFunction function;
+
+  public WriteProcessOperator(KeyedProcessFunction<String, HoodieRecord, 
Tuple3<String, List<WriteStatus>, Integer>> function) {
+    super(function);
+    this.function = (KeyedWriteProcessFunction) function;
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {

Review comment:
       It would be better to explain why do we need to implement the 
`snapshotState` method both in UDF and operator?

##########
File path: hudi-flink-writer/src/main/java/org/apache/hudi/sink/CommitSink.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.hudi.HudiFlinkStreamer;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.util.StreamerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class CommitSink extends RichSinkFunction<Tuple3<String, 
List<WriteStatus>, Integer>> {
+  private static final Logger LOG = LoggerFactory.getLogger(CommitSink.class);
+  /**
+   * Job conf.
+   */
+  private HudiFlinkStreamer.Config cfg;
+
+  /**
+   * HoodieWriteConfig.
+   */
+  private HoodieWriteConfig writeConfig;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  private Map<String, List<List<WriteStatus>>> bufferedWriteStatus = new 
HashMap<>();
+
+  private Integer upsertParallelSize = 0;
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    // get configs from runtimeContext
+    cfg = (HudiFlinkStreamer.Config) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+    upsertParallelSize = 
getRuntimeContext().getExecutionConfig().getParallelism();
+
+    // HoodieWriteConfig
+    writeConfig = StreamerUtil.getHoodieClientConfig(cfg);
+
+    // writeClient
+    writeClient = new HoodieFlinkWriteClient<>(new 
HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), writeConfig);
+  }
+
+  @Override
+  public void invoke(Tuple3<String, List<WriteStatus>, Integer> value, Context 
context) throws Exception {
+    LOG.info("Receive records, instantTime = [{}], subtaskId = [{}], records 
size = [{}]", value.f0, value.f2, value.f1.size());
+    try {
+      if (bufferedWriteStatus.containsKey(value.f0)) {
+        bufferedWriteStatus.get(value.f0).add(value.f1);
+      } else {
+        List<List<WriteStatus>> oneBatchData = new 
ArrayList<>(upsertParallelSize);
+        oneBatchData.add(value.f1);
+        bufferedWriteStatus.put(value.f0, oneBatchData);
+      }
+      // check and commit
+      checkAndCommit(value.f0);
+    } catch (Exception e) {
+      LOG.error("Invoke sink error: " + Thread.currentThread().getId() + ";" + 
this);
+      throw e;
+    }
+  }
+
+  /**
+   * Check and commit if all subtask completed.
+   *
+   * @throws Exception
+   */
+  private boolean checkAndCommit(String instantTime) throws Exception {

Review comment:
       You do not use the returned value, right?

##########
File path: hudi-flink-writer/src/main/java/org/apache/hudi/sink/CommitSink.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.hudi.HudiFlinkStreamer;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.util.StreamerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class CommitSink extends RichSinkFunction<Tuple3<String, 
List<WriteStatus>, Integer>> {
+  private static final Logger LOG = LoggerFactory.getLogger(CommitSink.class);
+  /**
+   * Job conf.
+   */
+  private HudiFlinkStreamer.Config cfg;
+
+  /**
+   * HoodieWriteConfig.
+   */
+  private HoodieWriteConfig writeConfig;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  private Map<String, List<List<WriteStatus>>> bufferedWriteStatus = new 
HashMap<>();
+
+  private Integer upsertParallelSize = 0;
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    // get configs from runtimeContext
+    cfg = (HudiFlinkStreamer.Config) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+    upsertParallelSize = 
getRuntimeContext().getExecutionConfig().getParallelism();
+
+    // HoodieWriteConfig
+    writeConfig = StreamerUtil.getHoodieClientConfig(cfg);
+
+    // writeClient
+    writeClient = new HoodieFlinkWriteClient<>(new 
HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), writeConfig);
+  }
+
+  @Override
+  public void invoke(Tuple3<String, List<WriteStatus>, Integer> value, Context 
context) throws Exception {
+    LOG.info("Receive records, instantTime = [{}], subtaskId = [{}], records 
size = [{}]", value.f0, value.f2, value.f1.size());
+    try {
+      if (bufferedWriteStatus.containsKey(value.f0)) {
+        bufferedWriteStatus.get(value.f0).add(value.f1);
+      } else {
+        List<List<WriteStatus>> oneBatchData = new 
ArrayList<>(upsertParallelSize);
+        oneBatchData.add(value.f1);
+        bufferedWriteStatus.put(value.f0, oneBatchData);
+      }
+      // check and commit
+      checkAndCommit(value.f0);
+    } catch (Exception e) {
+      LOG.error("Invoke sink error: " + Thread.currentThread().getId() + ";" + 
this);
+      throw e;
+    }
+  }
+
+  /**
+   * Check and commit if all subtask completed.
+   *
+   * @throws Exception
+   */
+  private boolean checkAndCommit(String instantTime) throws Exception {
+    if (bufferedWriteStatus.get(instantTime).size() == upsertParallelSize) {
+      LOG.info("Instant [{}] process complete, start commit!", instantTime);
+      doCommit(instantTime);
+      bufferedWriteStatus.clear();
+      LOG.info("Instant [{}] commit completed!", instantTime);
+      return true;
+    } else {
+      LOG.info("Instant [{}], can not commit yet, subtask completed : 
[{}/{}]", instantTime, bufferedWriteStatus.get(instantTime).size(), 
upsertParallelSize);
+      return false;
+    }
+  }
+
+  public void doCommit(String instantTime) throws Exception {

Review comment:
       `public` -> `private`

##########
File path: hudi-flink-writer/src/main/java/org/apache/hudi/sink/CommitSink.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.hudi.HudiFlinkStreamer;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.util.StreamerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class CommitSink extends RichSinkFunction<Tuple3<String, 
List<WriteStatus>, Integer>> {
+  private static final Logger LOG = LoggerFactory.getLogger(CommitSink.class);
+  /**
+   * Job conf.
+   */
+  private HudiFlinkStreamer.Config cfg;
+
+  /**
+   * HoodieWriteConfig.
+   */
+  private HoodieWriteConfig writeConfig;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  private Map<String, List<List<WriteStatus>>> bufferedWriteStatus = new 
HashMap<>();
+
+  private Integer upsertParallelSize = 0;
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    // get configs from runtimeContext
+    cfg = (HudiFlinkStreamer.Config) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+    upsertParallelSize = 
getRuntimeContext().getExecutionConfig().getParallelism();
+
+    // HoodieWriteConfig
+    writeConfig = StreamerUtil.getHoodieClientConfig(cfg);
+
+    // writeClient
+    writeClient = new HoodieFlinkWriteClient<>(new 
HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), writeConfig);
+  }
+
+  @Override
+  public void invoke(Tuple3<String, List<WriteStatus>, Integer> value, Context 
context) throws Exception {

Review comment:
       It would be better to make the `value` variable more reasonable.

##########
File path: 
hudi-flink-writer/src/main/java/org/apache/hudi/operator/WriteProcessOperator.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class WriteProcessOperator extends KeyedProcessOperator<String, 
HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> {

Review comment:
       WDYT about renaming to `KeyedWriteProcessOperator`? Since you use 
`KeyedWriteProcessFunction`.

##########
File path: 
hudi-flink-writer/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.HudiFlinkStreamer;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieFlinkStreamerException;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, 
HoodieRecord, Tuple3<String, List<WriteStatus>, Integer>> implements 
CheckpointedFunction {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(KeyedWriteProcessFunction.class);
+  private List<HoodieRecord> records = new LinkedList<>();
+  private Collector<Tuple3<String, List<WriteStatus>, Integer>> output;
+  private int indexOfThisSubtask;
+  private String latestInstant;
+  private boolean hasRecordsIn;
+
+  /**
+   * Job conf.
+   */
+  private HudiFlinkStreamer.Config cfg;
+  /**
+   * Serializable hadoop conf.
+   */
+  private SerializableConfiguration serializableHadoopConf;
+  /**
+   * HoodieWriteConfig.
+   */
+  private HoodieWriteConfig writeConfig;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+
+    indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+
+    cfg = (HudiFlinkStreamer.Config) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+
+    // hadoopConf
+    serializableHadoopConf = new SerializableConfiguration(new 
org.apache.hadoop.conf.Configuration());
+    // HoodieWriteConfig
+    writeConfig = StreamerUtil.getHoodieClientConfig(cfg);
+
+    HoodieFlinkEngineContext context = new 
HoodieFlinkEngineContext(serializableHadoopConf, new 
FlinkTaskContextSupplier(getRuntimeContext()));
+
+    // writeClient
+    writeClient = new HoodieFlinkWriteClient<>(context, writeConfig);
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {

Review comment:
       Never thrown exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to