garyli1019 commented on a change in pull request #5018: URL: https://github.com/apache/hudi/pull/5018#discussion_r830582362
########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java ########## @@ -91,13 +91,14 @@ public static FlinkWriteHelper newInstance() { @Override public List<HoodieRecord<T>> deduplicateRecords( List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) { + final boolean hasInsert = records.get(0).getCurrentLocation().getInstantTime().equals("I"); Review comment: how about renaming this as `isInsertBucket` and add a comment to explain why we need this. ########## File path: hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java ########## @@ -191,6 +191,65 @@ public void testMergeOnReadWriteWithCompaction(String indexType) throws Exceptio TestData.checkWrittenFullData(tempFile, EXPECTED); } + @ParameterizedTest + @ValueSource(strings = {"BUCKET"}) + public void testCopyOnWriteBucketIndex(String indexType) throws Exception { + int parallelism = 4; + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.INDEX_TYPE, indexType); + conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1); + conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id"); + conf.setBoolean(FlinkOptions.PRE_COMBINE,true); + conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.COPY_ON_WRITE.name()); + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.getConfig().disableObjectReuse(); + execEnv.setParallelism(parallelism); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); + execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + // Read from file source + RowType rowType = + (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) + .getLogicalType(); + + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + false, + true, + TimestampFormat.ISO_8601 + ); + String sourcePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_source.data")).toString(); + + TextInputFormat format = new TextInputFormat(new Path(sourcePath)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); + TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; + format.setCharsetName("UTF-8"); + + DataStream<RowData> dataStream = execEnv + // use PROCESS_CONTINUOUSLY mode to trigger checkpoint + .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(parallelism); + + DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream); + DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); + Pipelines.clean(conf, pipeline); + JobClient client = execEnv.executeAsync(execEnv.getStreamGraph()); + if (client.getJobStatus().get() != JobStatus.FAILED) { + try { + TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish Review comment: is this sleep still needed if we test for COW? ########## File path: hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java ########## @@ -191,6 +191,65 @@ public void testMergeOnReadWriteWithCompaction(String indexType) throws Exceptio TestData.checkWrittenFullData(tempFile, EXPECTED); } + @ParameterizedTest + @ValueSource(strings = {"BUCKET"}) + public void testCopyOnWriteBucketIndex(String indexType) throws Exception { Review comment: can we use this test for the COW table? include state index as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org