Toroidals commented on issue #10608:
URL: https://github.com/apache/hudi/issues/10608#issuecomment-1925610466
> Only 1.0 release supports concurrent streaming writers.
If the Flink checkpoint is closed, the writing can be done normally, but
when the checkpoint is on, there will be an error. The requirement of writing
to mor table by Flink is that the checkpoint must be on.
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(Long.parseLong(confInfo.get("checkpoint_interval"))),
CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(TimeUnit.SECONDS.toMillis(Long.parseLong(confInfo.get("checkpoint_timeout"))));
env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.parseInt(confInfo.get("checkpoint_max_concurrent")));
env.getCheckpointConfig().setCheckpointStorage(confInfo.get("checkpoint_path"));
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.parseInt(confInfo.get("checkpoint_failure_number")));
env.setRestartStrategy(RestartStrategies.noRestart());
EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new
EmbeddedRocksDBStateBackend(true);
embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
env.setStateBackend(embeddedRocksDBStateBackend);
DataStreamSource<String> dataStreamSource = env.addSource();
HoodiePipeline.Builder builder =
HoodiePipeline.builder(infoMap.get("hudi_table_name"));
Map<String, String> options = new HashMap<>();
String hudiFieldMap =
infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
ArrayList<ArrayList<String>> fieldList =
JSON.parseObject(hudiFieldMap, new
TypeReference<ArrayList<ArrayList<String>>>() {
});
for (ArrayList<String> columnList : fieldList) {
builder.column("`" + columnList.get(0) + "` " +
columnList.get(1));
}
builder.column("_flink_cdc_connector string");
builder.column("_flink_cdc_db string");
builder.column("_flink_cdc_table string");
builder.column("_flink_cdc_op string");
builder.column("_flink_cdc_ts_ms timestamp");
builder.pk(infoMap.get("hudi_primary_key"));
options.put(FlinkOptions.TABLE_TYPE.key(),
HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.DATABASE_NAME.key(),
infoMap.get("hudi_database_name"));
options.put(FlinkOptions.TABLE_NAME.key(),
infoMap.get("hudi_table_name"));
options.put(FlinkOptions.PRE_COMBINE.key(), "true");
options.put(FlinkOptions.PRECOMBINE_FIELD.key(),
infoMap.get("hudi_precombine_field"));
options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
options.put(FlinkOptions.INDEX_TYPE.key(),
HoodieIndex.IndexType.BUCKET.name());
options.put(FlinkOptions.WRITE_TASKS.key(),
infoMap.get("hudi_write_tasks"));
options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(),
bucketAssignTasks);
options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(),
infoMap.get("hudi_bucket_index_num_buckets"));
options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(),
infoMap.get("hudi_bucket_index_engine_type"));
options.put(FlinkOptions.COMPACTION_TASKS.key(), compactionTasks);
options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(),
infoMap.get("hudi_compaction_trigger_strategy"));
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(),
infoMap.get("hudi_compaction_delta_commits"));
options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(),
infoMap.get("hudi_compaction_delta_seconds"));
options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(),
infoMap.get("hudi_hive_sync_enabled"));
options.put(FlinkOptions.HIVE_SYNC_MODE.key(),
infoMap.get("hudi_hive_sync_mode"));
options.put(FlinkOptions.HIVE_SYNC_DB.key(),
infoMap.get("hudi_hive_sync_db"));
options.put(FlinkOptions.HIVE_SYNC_TABLE.key(),
infoMap.get("hudi_hive_sync_table"));
options.put(FlinkOptions.PARTITION_PATH_FIELD.key(),
"_flink_cdc_table");
options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(),
"_flink_cdc_table");
options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(),
connectInfo.get("hive_metastore_url"));
options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(),
connectInfo.get("conn_url"));
options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");
options.put(FlinkOptions.PARTITION_PATH_FIELD.key(),
infoMap.get("hudi_hive_sync_partition_fields"));
options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(),
infoMap.get("hudi_hive_sync_partition_fields"));
options.put(FlinkOptions.WRITE_RATE_LIMIT.key(),
infoMap.get("hudi_write_rate_limit"));
options.put(FlinkOptions.WRITE_CLIENT_ID.key(),
String.valueOf(System.currentTimeMillis()));
builder.options(options);
builder.sink(dataStreamSource, false);
env.execute("kafka-to-hudi");
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]