danny0405 commented on a change in pull request #2430:
URL: https://github.com/apache/hudi/pull/2430#discussion_r560742629



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.streamer.FlinkStreamerConfig;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hoodie Flink config options.
+ *
+ * <p>It has the options for Hoodie table read and write. It also defines some 
utilities.
+ */
+public class FlinkOptions {
+  private FlinkOptions() {
+  }
+
+  // ------------------------------------------------------------------------
+  //  Base Options
+  // ------------------------------------------------------------------------
+  public static final ConfigOption<String> PATH = ConfigOptions
+      .key("path")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Base path for the target hoodie table."
+          + "\nThe path would be created if it does not exist,\n"
+          + "otherwise a Hoodie table expects to be initialized successfully");
+
+  public static final ConfigOption<String> PROPS_FILE_PATH = ConfigOptions
+      .key("properties-file.path")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Path to properties file on local-fs or dfs, with 
configurations for \n"
+          + "hoodie client, schema provider, key generator and data source. 
For hoodie client props, sane defaults are\n"
+          + "used, but recommend use to provide basic things like metrics 
endpoints, hive configs etc. For sources, refer\n"
+          + "to individual classes, for supported properties");
+
+  // ------------------------------------------------------------------------
+  //  Read Options
+  // ------------------------------------------------------------------------
+  public static final ConfigOption<String> READ_SCHEMA_FILE_PATH = 
ConfigOptions
+      .key("read.schema.file.path")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Avro schema file path, the parsed schema is used for 
deserializing");
+
+  // ------------------------------------------------------------------------
+  //  Write Options
+  // ------------------------------------------------------------------------
+  public static final ConfigOption<String> TABLE_NAME = ConfigOptions
+      .key(HoodieWriteConfig.TABLE_NAME)
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Table name to register to Hive metastore");
+
+  public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
+      .key("write.table.type")
+      .stringType()
+      .defaultValue("COPY_ON_WRITE")
+      .withDescription("Type of table to write. COPY_ON_WRITE (or) 
MERGE_ON_READ");
+
+  public static final ConfigOption<String> OPERATION = ConfigOptions
+      .key("write.operation")
+      .stringType()
+      .defaultValue("upsert")
+      .withDescription("The write operation, that this write should do");
+
+  public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
+      .key("write.precombine.field")
+      .stringType()
+      .defaultValue("ts")
+      .withDescription("Field used in preCombining before actual write. When 
two records have the same\n"
+          + "key value, we will pick the one with the largest value for the 
precombine field,\n"
+          + "determined by Object.compareTo(..)");
+
+  public static final ConfigOption<String> PAYLOAD_CLASS = ConfigOptions
+      .key("write.payload.class")
+      .stringType()
+      .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
+      .withDescription("Payload class used. Override this, if you like to roll 
your own merge logic, when upserting/inserting.\n"
+          + "This will render any value set for the option in-effective");
+
+  /**
+   * Flag to indicate whether to drop duplicates upon insert.
+   * By default insert will accept duplicates, to gain extra performance.
+   */
+  public static final ConfigOption<Boolean> INSERT_DROP_DUPS = ConfigOptions
+      .key("write.insert.drop.duplicates")
+      .booleanType()
+      .defaultValue(false)
+      .withDescription("Flag to indicate whether to drop duplicates upon 
insert.\n"
+          + "By default insert will accept duplicates, to gain extra 
performance");
+
+  public static final ConfigOption<Integer> RETRY_TIMES = ConfigOptions
+      .key("write.retry.times")
+      .intType()
+      .defaultValue(3)
+      .withDescription("Flag to indicate how many times streaming job should 
retry for a failed checkpoint batch.\n"
+          + "By default 3");
+
+  public static final ConfigOption<Long> RETRY_INTERVAL_MS = ConfigOptions
+      .key("write.retry.interval.ms")
+      .longType()
+      .defaultValue(2000L)
+      .withDescription("Flag to indicate how long (by millisecond) before a 
retry should issued for failed checkpoint batch.\n"
+          + "By default 2000 and it will be doubled by every retry");
+
+  public static final ConfigOption<Boolean> IGNORE_FAILED_BATCH = ConfigOptions
+      .key("write.ignore.failed.batch")
+      .booleanType()
+      .defaultValue(true)
+      .withDescription("Flag to indicate whether to ignore any non exception 
error (e.g. writestatus error). within a checkpoint batch.\n"
+          + "By default true (in favor of streaming progressing over data 
integrity)");
+
+  public static final ConfigOption<String> RECORD_KEY_FIELD = ConfigOptions
+      .key(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY)
+      .stringType()
+      .defaultValue("uuid")
+      .withDescription("Record key field. Value to be used as the `recordKey` 
component of `HoodieKey`.\n"
+          + "Actual value will be obtained by invoking .toString() on the 
field value. Nested fields can be specified using "
+          + "the dot notation eg: `a.b.c`");
+
+  public static final ConfigOption<String> PARTITION_PATH_FIELD = ConfigOptions
+      .key(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)
+      .stringType()
+      .defaultValue("partition-path")
+      .withDescription("Partition path field. Value to be used at the 
`partitionPath` component of `HoodieKey`.\n"
+          + "Actual value obtained by invoking .toString()");
+
+  public static final ConfigOption<String> KEYGEN_CLASS = ConfigOptions
+      .key("hoodie.datasource.write.keygenerator.class")
+      .stringType()
+      .defaultValue(SimpleAvroKeyGenerator.class.getName())
+      .withDescription("Key generator class, that implements will extract the 
key out of incoming record");
+
+  public static final ConfigOption<Integer> WRITE_TASK_PARALLELISM = 
ConfigOptions
+      .key("write.task.parallelism")
+      .intType()
+      .defaultValue(4)
+      .withDescription("Parallelism of tasks that do actual write, default is 
4");
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  // Remember to update the set when adding new options.
+  public static final List<ConfigOption<?>> OPTIONAL_OPTIONS = Arrays.asList(
+      TABLE_TYPE, OPERATION, PRECOMBINE_FIELD, PAYLOAD_CLASS, 
INSERT_DROP_DUPS, RETRY_TIMES,
+      RETRY_INTERVAL_MS, IGNORE_FAILED_BATCH, RECORD_KEY_FIELD, 
PARTITION_PATH_FIELD, KEYGEN_CLASS
+  );
+
+  // Prefix for Hoodie specific properties.
+  public static final String PROPERTIES_PREFIX = "properties.";

Review comment:
       It could, thanks ~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to