yanghua commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r583316594



##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -127,21 +136,9 @@ public void bootstrap(Option<Map<String, String>> 
extraMetadata) {
     table.validateUpsertSchema();
     preWrite(instantTime, WriteOperationType.UPSERT);
     final HoodieRecord<T> record = records.get(0);
-    final HoodieRecordLocation loc = record.getCurrentLocation();
-    final String fileID = loc.getFileId();
-    final boolean isInsert = loc.getInstantTime().equals("I");
-    final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
-    if (bucketToHandles.containsKey(fileID)) {
-      writeHandle = bucketToHandles.get(fileID);
-    } else {
-      // create the write handle if not exists
-      writeHandle = isInsert
-          ? new FlinkCreateHandle<>(getConfig(), instantTime, table, 
record.getPartitionPath(),
-          fileID, table.getTaskContextSupplier())
-          : new FlinkMergeHandle<>(getConfig(), instantTime, table, 
records.listIterator(), record.getPartitionPath(),
-          fileID, table.getTaskContextSupplier());
-      bucketToHandles.put(fileID, writeHandle);
-    }
+    final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(
+        record, 
table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),

Review comment:
       Can you extract 
`table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ)` 
into a variable?

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit.delta;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.execution.FlinkLazyInsertIterable;
+import org.apache.hudi.io.ExplicitWriteHandleFactory;
+import org.apache.hudi.io.FlinkAppendHandle;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class BaseFlinkDeltaCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
+    extends BaseFlinkCommitActionExecutor<T> {
+
+  public BaseFlinkDeltaCommitActionExecutor(HoodieEngineContext context,
+                                            HoodieWriteHandle<?, ?, ?, ?> 
writeHandle,
+                                            HoodieWriteConfig config,
+                                            HoodieTable table,
+                                            String instantTime,
+                                            WriteOperationType operationType) {
+    super(context, writeHandle, config, table, instantTime, operationType);
+    Preconditions.checkArgument(writeHandle instanceof FlinkAppendHandle,
+        "MOR write handle should always be a FlinkAppendHandle");

Review comment:
       Do not hard code  `FlinkAppendHandle `?

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/operator/CopyOnWriteTest.java
##########
@@ -58,13 +57,13 @@
 /**
  * Test cases for StreamingSinkFunction.
  */
-public class StreamWriteFunctionTest {
+public class CopyOnWriteTest {

Review comment:
       Let us start with `Test` and give it a more valuable name?

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -160,7 +157,12 @@ public void bootstrap(Option<Map<String, String>> 
extraMetadata) {
         getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
     table.validateUpsertSchema();
     preWrite(instantTime, WriteOperationType.INSERT);
-    HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, 
instantTime, records);
+    // create the write handle if not exists
+    final HoodieRecord<T> record = records.get(0);
+    final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(
+        record, 
table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),

Review comment:
       ditto

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/operator/CopyOnWriteTest.java
##########
@@ -464,21 +476,33 @@ private void checkInstantState(
     final String instant;
     switch (state) {
       case REQUESTED:
-        instant = writeClient.getInflightAndRequestedInstant("COPY_ON_WRITE");
+        instant = writeClient.getInflightAndRequestedInstant(getTableType());
         break;
       case COMPLETED:
-        instant = writeClient.getLastCompletedInstant("COPY_ON_WRITE");
+        instant = writeClient.getLastCompletedInstant(getTableType());
         break;
       default:
         throw new AssertionError("Unexpected state");
     }
     assertThat(instant, is(instantStr));
   }
 
+  protected String getTableType() {
+    return "COPY_ON_WRITE";

Review comment:
       Replace it with `HoodieTableType.COPY_ON_WRITE.name()`?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -178,6 +178,14 @@ private void init(HoodieRecord record) {
     }
   }
 
+  /**
+   * Returns whether the hoodie record is an UPDATE.
+   */
+  protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
+    // If currentLocation is present, then this is an update
+    return hoodieRecord.getCurrentLocation() != null;

Review comment:
       In previous PRs, you used `instantTime` of HoodieRecordLocation to judge 
update operation, right? So, let keep the same?

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/operator/MergeOnReadCompactTest.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test cases for delta stream write with compaction.
+ */
+public class MergeOnReadCompactTest extends CopyOnWriteTest {
+
+  @Override
+  protected void setUp(Configuration conf) {
+    // trigger the compaction for every finished checkpoint
+    conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+  }
+
+  @Disabled
+  @Test
+  public void testIndexStateBootstrap() {
+    // Ignore the index bootstrap because we only support parquet load now.
+  }
+
+  Map<String, String> getMiniBatchExpected() {
+    Map<String, String> expected = new HashMap<>();
+    // MOR mode merges the messages with the same key.
+    expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
+    return expected;
+  }
+
+  @Override
+  protected String getTableType() {
+    return "MERGE_ON_READ";

Review comment:
       ditto

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssigners.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator.partitioner;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.partitioner.delta.DeltaBucketAssigner;
+
+/**
+ * Utilities for {@code BucketAssigner}.
+ */
+public abstract class BucketAssigners {
+
+  private BucketAssigners() {}
+
+  /**
+   * Creates a {@code BucketAssigner}.
+   *
+   * @param tableType The table type
+   * @param context   The engine context
+   * @param config    The configuration
+   * @return the bucket assigner instance
+   */
+  public static BucketAssigner create(
+      HoodieTableType tableType,
+      HoodieFlinkEngineContext context,
+      HoodieWriteConfig config) {
+    switch (tableType) {
+      case COPY_ON_WRITE:
+        return new BucketAssigner(context, config);
+      case MERGE_ON_READ:
+        return new DeltaBucketAssigner(context, config);
+      default:
+        throw new AssertionError();

Review comment:
       Throw an unsupported exception?

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -207,13 +209,40 @@ public void bootstrap(Option<Map<String, String>> 
extraMetadata) {
   }
 
   @Override
-  public void commitCompaction(String compactionInstantTime, List<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata) throws IOException {
-    throw new HoodieNotSupportedException("Compaction is not supported yet");
+  public void commitCompaction(
+      String compactionInstantTime,
+      List<WriteStatus> writeStatuses,
+      Option<Map<String, String>> extraMetadata) throws IOException {
+    HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, 
(HoodieFlinkEngineContext) context);
+    HoodieCommitMetadata metadata = 
FlinkCompactHelpers.newInstance().createCompactionMetadata(
+        table, compactionInstantTime, writeStatuses, config.getSchema());
+    extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
+    completeCompaction(metadata, writeStatuses, table, compactionInstantTime);
   }
 
   @Override
-  protected void completeCompaction(HoodieCommitMetadata metadata, 
List<WriteStatus> writeStatuses, HoodieTable<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> table, String compactionCommitTime) {
-    throw new HoodieNotSupportedException("Compaction is not supported yet");
+  public void completeCompaction(
+      HoodieCommitMetadata metadata,
+      List<WriteStatus> writeStatuses,
+      HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> table,
+      String compactionCommitTime) {
+    this.context.setJobStatus(this.getClass().getSimpleName(), "Collect 
compaction write status and commit compaction");
+    List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+    finalizeWrite(table, compactionCommitTime, writeStats);
+    LOG.info("Committing Compaction " + compactionCommitTime + ". Finished 
with result " + metadata);

Review comment:
       Let the joined strings and variable be a complete sentence?

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/operator/MergeOnReadWriteTest.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test cases for delta stream write.
+ */
+public class MergeOnReadWriteTest extends CopyOnWriteTest {
+  private FileSystem fs;
+  private HoodieWriteConfig writeConfig;
+  private HoodieFlinkEngineContext context;
+
+  @BeforeEach
+  public void before() throws Exception {
+    super.before();
+    fs = FSUtils.getFs(tempFile.getAbsolutePath(), new 
org.apache.hadoop.conf.Configuration());
+    writeConfig = StreamerUtil.getHoodieClientConfig(conf);
+    context = new HoodieFlinkEngineContext(
+        new SerializableConfiguration(StreamerUtil.getHadoopConf()),
+        new FlinkTaskContextSupplier(null));
+  }
+
+  @Override
+  protected void checkWrittenData(File baseFile, Map<String, String> expected, 
int partitions) throws Exception {
+    HoodieTableMetaClient metaClient = HoodieFlinkTable.create(writeConfig, 
context).getMetaClient();
+    Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+    String latestInstant = 
metaClient.getCommitsTimeline().filterCompletedInstants()
+        .getInstants()
+        .filter(x -> 
x.getAction().equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
+        .map(HoodieInstant::getTimestamp)
+        .collect(Collectors.toList()).stream()
+        .max(Comparator.naturalOrder())
+        .orElse(null);
+    TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, 
partitions, schema);
+  }
+
+  @Disabled
+  @Test
+  public void testIndexStateBootstrap() {
+    // Ignore the index bootstrap because we only support parquet load now.
+  }
+
+  Map<String, String> getMiniBatchExpected() {
+    Map<String, String> expected = new HashMap<>();
+    // MOR mode merges the messages with the same key.
+    expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
+    return expected;
+  }
+
+  @Override
+  protected String getTableType() {
+    return "MERGE_ON_READ";

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to