garyli1019 commented on a change in pull request #5018:
URL: https://github.com/apache/hudi/pull/5018#discussion_r830582362



##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
##########
@@ -91,13 +91,14 @@ public static FlinkWriteHelper newInstance() {
   @Override
   public List<HoodieRecord<T>> deduplicateRecords(
       List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) 
{
+    final boolean hasInsert = 
records.get(0).getCurrentLocation().getInstantTime().equals("I");

Review comment:
       how about renaming this as `isInsertBucket` and add a comment to explain 
why we need this.

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
##########
@@ -191,6 +191,65 @@ public void testMergeOnReadWriteWithCompaction(String 
indexType) throws Exceptio
     TestData.checkWrittenFullData(tempFile, EXPECTED);
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"BUCKET"})
+  public void testCopyOnWriteBucketIndex(String indexType) throws Exception {
+    int parallelism = 4;
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.INDEX_TYPE, indexType);
+    conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1);
+    conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
+    conf.setBoolean(FlinkOptions.PRE_COMBINE,true);
+    conf.setString(FlinkOptions.TABLE_TYPE, 
HoodieTableType.COPY_ON_WRITE.name());
+    StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    execEnv.getConfig().disableObjectReuse();
+    execEnv.setParallelism(parallelism);
+    // set up checkpoint interval
+    execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+    execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+    // Read from file source
+    RowType rowType =
+            (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+                    .getLogicalType();
+
+    JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
+            rowType,
+            InternalTypeInfo.of(rowType),
+            false,
+            true,
+            TimestampFormat.ISO_8601
+    );
+    String sourcePath = Objects.requireNonNull(Thread.currentThread()
+            
.getContextClassLoader().getResource("test_source.data")).toString();
+
+    TextInputFormat format = new TextInputFormat(new Path(sourcePath));
+    format.setFilesFilter(FilePathFilter.createDefaultFilter());
+    TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+    format.setCharsetName("UTF-8");
+
+    DataStream<RowData> dataStream = execEnv
+            // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
+            .readFile(format, sourcePath, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
+            .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+            .setParallelism(parallelism);
+
+    DataStream<HoodieRecord> hoodieRecordDataStream = 
Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
+    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, 
parallelism, hoodieRecordDataStream);
+    Pipelines.clean(conf, pipeline);
+    JobClient client = execEnv.executeAsync(execEnv.getStreamGraph());
+    if (client.getJobStatus().get() != JobStatus.FAILED) {
+      try {
+        TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to 
finish

Review comment:
       is this sleep still needed if we test for COW?

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
##########
@@ -191,6 +191,65 @@ public void testMergeOnReadWriteWithCompaction(String 
indexType) throws Exceptio
     TestData.checkWrittenFullData(tempFile, EXPECTED);
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"BUCKET"})
+  public void testCopyOnWriteBucketIndex(String indexType) throws Exception {

Review comment:
       can we use this test for the COW table? include state index as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to