danny0405 commented on code in PR #6396:
URL: https://github.com/apache/hudi/pull/6396#discussion_r946274840


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java:
##########
@@ -293,6 +293,47 @@ void testSetupReadOptionsForSource() {
     assertThat(conf2.getString(FlinkOptions.QUERY_TYPE), 
is(FlinkOptions.QUERY_TYPE_INCREMENTAL));
   }
 
+  @Test
+  void testBucketIndexOptionForSink() {
+    ResolvedSchema schema1 = SchemaBuilder.instance()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20).notNull())
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .primaryKey("f0", "f1")
+        .build();
+
+    this.conf.setString(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.BUCKET.name());
+
+    // default use recordKey fields
+    final MockContext context = MockContext.getInstance(this.conf, schema1, 
"f2");
+    HoodieTableSink tableSink = (HoodieTableSink) (new 
HoodieTableFactory().createDynamicTableSink(context));
+    final Configuration conf = tableSink.getConf();
+    assertThat(conf.getString(FlinkOptions.INDEX_KEY_FIELD), is("f0,f1"));
+
+    this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f0");
+    final MockContext context2 = MockContext.getInstance(this.conf, schema1, 
"f2");
+    HoodieTableSink tableSink2 = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(context2);
+    final Configuration conf2 = tableSink2.getConf();
+    assertThat(conf2.getString(FlinkOptions.INDEX_KEY_FIELD), is("f0"));
+
+    this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f1");
+    final MockContext context3 = MockContext.getInstance(this.conf, schema1, 
"f2");
+    HoodieTableSink tableSink3 = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(context3);
+    final Configuration conf3 = tableSink3.getConf();
+    assertThat(conf3.getString(FlinkOptions.INDEX_KEY_FIELD), is("f1"));
+
+    this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f0,f1");
+    final MockContext context4 = MockContext.getInstance(this.conf, schema1, 
"f2");
+    HoodieTableSink tableSink4 = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(context4);
+    final Configuration conf4 = tableSink4.getConf();
+    assertThat(conf4.getString(FlinkOptions.INDEX_KEY_FIELD), is("f0,f1"));
+
+    // index key field is not a subset of or equal to the recordKey fields, 
will be throw exception
+    this.conf.setString(FlinkOptions.INDEX_KEY_FIELD, "f2");

Review Comment:
   `will be throw exception` -> `will throw exception`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to