This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f1e0018 [HUDI-1704] Use PRIMARY KEY syntax to define record keys for
Flink Hudi table (#2694)
f1e0018 is described below
commit f1e0018f12b66770ba3785be7aa8f5f6a80bab6f
Author: Danny Chan <[email protected]>
AuthorDate: Thu Mar 18 20:21:52 2021 +0800
[HUDI-1704] Use PRIMARY KEY syntax to define record keys for Flink Hudi
table (#2694)
The SQL PRIMARY KEY semantics is very same with Hoodie record key, using
PRIMARY KEY is more straight-forward way instead of a table option:
hoodie.datasource.write.recordkey.field.
After this change, both PRIMARY KEY and table option can define hoodie
record key, while the PRIMARY KEY has higher priority if both are
defined.
Note: a column with PRIMARY KEY constraint is forced to be non-nullable.
---
.../apache/hudi/configuration/FlinkOptions.java | 8 ++
.../org/apache/hudi/table/HoodieTableFactory.java | 71 +++++++++++---
.../apache/hudi/table/TestHoodieTableFactory.java | 106 +++++++++++++++++++--
.../org/apache/hudi/utils/TestConfigurations.java | 3 +-
4 files changed, 168 insertions(+), 20 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 17eee61..5919238 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -377,4 +377,12 @@ public class FlinkOptions {
map.forEach(configuration::setString);
return configuration;
}
+
+ /**
+ * Returns whether the given conf defines default value for the option
{@code option}.
+ */
+ public static <T> boolean isDefaultValueDefined(Configuration conf,
ConfigOption<T> option) {
+ return !conf.getOptional(option).isPresent()
+ || conf.get(option).equals(option.defaultValue());
+ }
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 36020f4..a2dac36 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -19,11 +19,14 @@
package org.apache.hudi.table;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TableSinkFactory;
@@ -33,6 +36,8 @@ import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
@@ -43,19 +48,19 @@ import java.util.Map;
* Hoodie data source/sink factory.
*/
public class HoodieTableFactory implements TableSourceFactory<RowData>,
TableSinkFactory<RowData> {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieTableFactory.class);
+
public static final String FACTORY_ID = "hudi";
@Override
public TableSource<RowData> createTableSource(TableSourceFactory.Context
context) {
Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
- conf.setString(FlinkOptions.TABLE_NAME.key(),
context.getObjectIdentifier().getObjectName());
- conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",",
context.getTable().getPartitionKeys()));
+ TableSchema schema =
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
+ setupConfOptions(conf, context.getObjectIdentifier().getObjectName(),
context.getTable(), schema);
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should be not empty.")));
- TableSchema tableSchema =
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
- inferAvroSchema(conf,
tableSchema.toRowDataType().notNull().getLogicalType());
return new HoodieTableSource(
- tableSchema,
+ schema,
path,
context.getTable().getPartitionKeys(),
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
@@ -65,11 +70,9 @@ public class HoodieTableFactory implements
TableSourceFactory<RowData>, TableSin
@Override
public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
- conf.setString(FlinkOptions.TABLE_NAME.key(),
context.getObjectIdentifier().getObjectName());
- conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",",
context.getTable().getPartitionKeys()));
- TableSchema tableSchema =
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
- inferAvroSchema(conf,
tableSchema.toRowDataType().notNull().getLogicalType());
- return new HoodieTableSink(conf, tableSchema);
+ TableSchema schema =
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
+ setupConfOptions(conf, context.getObjectIdentifier().getObjectName(),
context.getTable(), schema);
+ return new HoodieTableSink(conf, schema);
}
@Override
@@ -90,6 +93,52 @@ public class HoodieTableFactory implements
TableSourceFactory<RowData>, TableSin
// -------------------------------------------------------------------------
/**
+ * Setup the config options based on the table definition, for e.g the table
name, primary key.
+ *
+ * @param conf The configuration to setup
+ * @param tableName The table name
+ * @param table The catalog table
+ * @param schema The physical schema
+ */
+ private static void setupConfOptions(
+ Configuration conf,
+ String tableName,
+ CatalogTable table,
+ TableSchema schema) {
+ // table name
+ conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
+ // hoodie key about options
+ setupHoodieKeyOptions(conf, table);
+ // infer avro schema from physical DDL schema
+ inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType());
+ }
+
+ /**
+ * Sets up the hoodie key options (e.g. record key and partition key) from
the table definition.
+ */
+ private static void setupHoodieKeyOptions(Configuration conf, CatalogTable
table) {
+ List<String> pkColumns = table.getSchema().getPrimaryKey()
+ .map(UniqueConstraint::getColumns).orElse(Collections.emptyList());
+ if (pkColumns.size() > 0) {
+ // the PRIMARY KEY syntax always has higher priority than option
FlinkOptions#RECORD_KEY_FIELD
+ String recordKey = String.join(",", pkColumns);
+ conf.setString(FlinkOptions.RECORD_KEY_FIELD, recordKey);
+ }
+ List<String> partitions = table.getPartitionKeys();
+ if (partitions.size() > 0) {
+ // the PARTITIONED BY syntax always has higher priority than option
FlinkOptions#PARTITION_PATH_FIELD
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",",
partitions));
+ }
+ // tweak the key gen class if possible
+ boolean complexHoodieKey = pkColumns.size() > 1 || partitions.size() > 1;
+ if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf,
FlinkOptions.KEYGEN_CLASS)) {
+ conf.setString(FlinkOptions.KEYGEN_CLASS,
ComplexAvroKeyGenerator.class.getName());
+ LOG.info("Table option [{}] is reset to {} because record key or
partition path has two or more fields",
+ FlinkOptions.KEYGEN_CLASS.key(),
ComplexAvroKeyGenerator.class.getName());
+ }
+ }
+
+ /**
* Inferences the deserialization Avro schema from the table schema (e.g.
the DDL)
* if both options {@link FlinkOptions#READ_AVRO_SCHEMA_PATH} and
* {@link FlinkOptions#READ_AVRO_SCHEMA} are not specified.
@@ -97,7 +146,7 @@ public class HoodieTableFactory implements
TableSourceFactory<RowData>, TableSin
* @param conf The configuration
* @param rowType The specified table row type
*/
- private void inferAvroSchema(Configuration conf, LogicalType rowType) {
+ private static void inferAvroSchema(Configuration conf, LogicalType rowType)
{
if (!conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA_PATH).isPresent()
&& !conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA).isPresent()) {
String inferredSchema =
AvroSchemaConverter.convertToSchema(rowType).toString();
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index f64808e..44c030a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -19,11 +19,14 @@
package org.apache.hudi.table;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -36,6 +39,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
import java.util.Objects;
import static org.hamcrest.CoreMatchers.is;
@@ -87,6 +91,38 @@ public class TestHoodieTableFactory {
}
@Test
+ void testSetupHoodieKeyOptionsForSource() {
+ this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
+ this.conf.setString(FlinkOptions.KEYGEN_CLASS, "dummyKeyGenClass");
+ // definition with simple primary key and partition path
+ TableSchema schema1 = TableSchema.builder()
+ .field("f0", DataTypes.INT().notNull())
+ .field("f1", DataTypes.VARCHAR(20))
+ .field("f2", DataTypes.TIMESTAMP(3))
+ .primaryKey("f0")
+ .build();
+ final MockSourceContext sourceContext1 =
MockSourceContext.getInstance(this.conf, schema1, "f2");
+ final HoodieTableSource tableSource1 = (HoodieTableSource) new
HoodieTableFactory().createTableSource(sourceContext1);
+ final Configuration conf1 = tableSource1.getConf();
+ assertThat(conf1.get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
+ assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS), is("dummyKeyGenClass"));
+
+ // definition with complex primary keys and partition paths
+ this.conf.setString(FlinkOptions.KEYGEN_CLASS,
FlinkOptions.KEYGEN_CLASS.defaultValue());
+ TableSchema schema2 = TableSchema.builder()
+ .field("f0", DataTypes.INT().notNull())
+ .field("f1", DataTypes.VARCHAR(20).notNull())
+ .field("f2", DataTypes.TIMESTAMP(3))
+ .primaryKey("f0", "f1")
+ .build();
+ final MockSourceContext sourceContext2 =
MockSourceContext.getInstance(this.conf, schema2, "f2");
+ final HoodieTableSource tableSource2 = (HoodieTableSource) new
HoodieTableFactory().createTableSource(sourceContext2);
+ final Configuration conf2 = tableSource2.getConf();
+ assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
+ assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS),
is(ComplexAvroKeyGenerator.class.getName()));
+ }
+
+ @Test
void testInferAvroSchemaForSink() {
// infer the schema if not specified
final HoodieTableSink tableSink1 =
@@ -102,6 +138,38 @@ public class TestHoodieTableFactory {
assertNull(conf2.get(FlinkOptions.READ_AVRO_SCHEMA), "expect schema string
as null");
}
+ @Test
+ void testSetupHoodieKeyOptionsForSink() {
+ this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
+ this.conf.setString(FlinkOptions.KEYGEN_CLASS, "dummyKeyGenClass");
+ // definition with simple primary key and partition path
+ TableSchema schema1 = TableSchema.builder()
+ .field("f0", DataTypes.INT().notNull())
+ .field("f1", DataTypes.VARCHAR(20))
+ .field("f2", DataTypes.TIMESTAMP(3))
+ .primaryKey("f0")
+ .build();
+ final MockSinkContext sinkContext1 =
MockSinkContext.getInstance(this.conf, schema1, "f2");
+ final HoodieTableSink tableSink1 = (HoodieTableSink) new
HoodieTableFactory().createTableSink(sinkContext1);
+ final Configuration conf1 = tableSink1.getConf();
+ assertThat(conf1.get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
+ assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS), is("dummyKeyGenClass"));
+
+ // definition with complex primary keys and partition paths
+ this.conf.setString(FlinkOptions.KEYGEN_CLASS,
FlinkOptions.KEYGEN_CLASS.defaultValue());
+ TableSchema schema2 = TableSchema.builder()
+ .field("f0", DataTypes.INT().notNull())
+ .field("f1", DataTypes.VARCHAR(20).notNull())
+ .field("f2", DataTypes.TIMESTAMP(3))
+ .primaryKey("f0", "f1")
+ .build();
+ final MockSinkContext sinkContext2 =
MockSinkContext.getInstance(this.conf, schema2, "f2");
+ final HoodieTableSink tableSink2 = (HoodieTableSink) new
HoodieTableFactory().createTableSink(sinkContext2);
+ final Configuration conf2 = tableSink2.getConf();
+ assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
+ assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS),
is(ComplexAvroKeyGenerator.class.getName()));
+ }
+
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
@@ -111,13 +179,25 @@ public class TestHoodieTableFactory {
*/
private static class MockSourceContext implements TableSourceFactory.Context
{
private final Configuration conf;
+ private final TableSchema schema;
+ private final List<String> partitions;
- private MockSourceContext(Configuration conf) {
+ private MockSourceContext(Configuration conf, TableSchema schema,
List<String> partitions) {
this.conf = conf;
+ this.schema = schema;
+ this.partitions = partitions;
}
static MockSourceContext getInstance(Configuration conf) {
- return new MockSourceContext(conf);
+ return getInstance(conf, TestConfigurations.TABLE_SCHEMA,
Collections.singletonList("partition"));
+ }
+
+ static MockSourceContext getInstance(Configuration conf, TableSchema
schema, String partition) {
+ return getInstance(conf, schema, Collections.singletonList(partition));
+ }
+
+ static MockSourceContext getInstance(Configuration conf, TableSchema
schema, List<String> partitions) {
+ return new MockSourceContext(conf, schema, partitions);
}
@Override
@@ -127,8 +207,7 @@ public class TestHoodieTableFactory {
@Override
public CatalogTable getTable() {
- return new CatalogTableImpl(TestConfigurations.TABLE_SCHEMA,
Collections.singletonList("partition"),
- conf.toMap(), "mock source table");
+ return new CatalogTableImpl(schema, partitions, conf.toMap(), "mock
source table");
}
@Override
@@ -142,13 +221,25 @@ public class TestHoodieTableFactory {
*/
private static class MockSinkContext implements TableSinkFactory.Context {
private final Configuration conf;
+ private final TableSchema schema;
+ private final List<String> partitions;
- private MockSinkContext(Configuration conf) {
+ private MockSinkContext(Configuration conf, TableSchema schema,
List<String> partitions) {
this.conf = conf;
+ this.schema = schema;
+ this.partitions = partitions;
}
static MockSinkContext getInstance(Configuration conf) {
- return new MockSinkContext(conf);
+ return getInstance(conf, TestConfigurations.TABLE_SCHEMA, "partition");
+ }
+
+ static MockSinkContext getInstance(Configuration conf, TableSchema schema,
String partition) {
+ return getInstance(conf, schema, Collections.singletonList(partition));
+ }
+
+ static MockSinkContext getInstance(Configuration conf, TableSchema schema,
List<String> partitions) {
+ return new MockSinkContext(conf, schema, partitions);
}
@Override
@@ -158,8 +249,7 @@ public class TestHoodieTableFactory {
@Override
public CatalogTable getTable() {
- return new CatalogTableImpl(TestConfigurations.TABLE_SCHEMA,
Collections.singletonList("partition"),
- conf.toMap(), "mock sink table");
+ return new CatalogTableImpl(this.schema, this.partitions, conf.toMap(),
"mock sink table");
}
@Override
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index e32b9c0..eaf5979 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -73,7 +73,8 @@ public class TestConfigurations {
+ " name varchar(10),\n"
+ " age int,\n"
+ " ts timestamp(3),\n"
- + " `partition` varchar(20)\n"
+ + " `partition` varchar(20),\n"
+ + " PRIMARY KEY(uuid) NOT ENFORCED\n"
+ ")\n"
+ "PARTITIONED BY (`partition`)\n"
+ "with (\n"