danny0405 commented on code in PR #6396: URL: https://github.com/apache/hudi/pull/6396#discussion_r946274840
########## hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java: ########## @@ -293,6 +293,47 @@ void testSetupReadOptionsForSource() { assertThat(conf2.getString(FlinkOptions.QUERY_TYPE), is(FlinkOptions.QUERY_TYPE_INCREMENTAL)); } + @Test + void testBucketIndexOptionForSink() { + ResolvedSchema schema1 = SchemaBuilder.instance() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20).notNull()) + .field("f2", DataTypes.TIMESTAMP(3)) + .primaryKey("f0", "f1") + .build(); + + this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name()); + + // default use recordKey fields + final MockContext context = MockContext.getInstance(this.conf, schema1, "f2"); + HoodieTableSink tableSink = (HoodieTableSink) (new HoodieTableFactory().createDynamicTableSink(context)); + final Configuration conf = tableSink.getConf(); + assertThat(conf.getString(FlinkOptions.INDEX_KEY_FIELD), is("f0,f1")); + + this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f0"); + final MockContext context2 = MockContext.getInstance(this.conf, schema1, "f2"); + HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(context2); + final Configuration conf2 = tableSink2.getConf(); + assertThat(conf2.getString(FlinkOptions.INDEX_KEY_FIELD), is("f0")); + + this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f1"); + final MockContext context3 = MockContext.getInstance(this.conf, schema1, "f2"); + HoodieTableSink tableSink3 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(context3); + final Configuration conf3 = tableSink3.getConf(); + assertThat(conf3.getString(FlinkOptions.INDEX_KEY_FIELD), is("f1")); + + this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f0,f1"); + final MockContext context4 = MockContext.getInstance(this.conf, schema1, "f2"); + HoodieTableSink tableSink4 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(context4); + final Configuration conf4 = tableSink4.getConf(); + assertThat(conf4.getString(FlinkOptions.INDEX_KEY_FIELD), is("f0,f1")); + + // index key field is not a subset of or equal to the recordKey fields, will be throw exception + this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f2"); Review Comment: `will be throw exception` -> `will throw exception` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org