Toroidals commented on issue #10779:
URL: https://github.com/apache/hudi/issues/10779#issuecomment-1972595825
> What catalog did you use then, the `Hudi` catalog in `hms` mode is
expected to be used here instead of the Flink Hive catalog.
The Flink is writing to "hudi.hudi_test_cdc_01" and the Hive is querying the
table "ods.hive_test_cdc_01", The complete code is as follows:
package com.hand.sink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.util.HoodiePipeline;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
/**
* @Author Toroidal
* @Date 2024/2/5 15:23
* @Version 1.0
*/
@Slf4j
public class CustomHudiStreamSink {
public static HoodiePipeline.Builder getHoodieBuilder(HashMap<String,
String> infoMap, HashMap<String, String> connectInfo) {
HoodiePipeline.Builder builder =
HoodiePipeline.builder("hudi_test_cdc_01");
String hudiFieldMap =
infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
ArrayList<ArrayList<String>> fieldList =
JSON.parseObject(hudiFieldMap, new
TypeReference<ArrayList<ArrayList<String>>>() {
});
for (ArrayList<String> columnList : fieldList) {
builder.column("`" + columnList.get(0) + "` " +
columnList.get(1));
}
builder.column("_flink_cdc_connector string");
builder.column("_flink_cdc_db string");
builder.column("_flink_cdc_table string");
builder.column("_flink_cdc_op string");
builder.column("_flink_cdc_ts_ms timestamp");
builder.pk("line_id");
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(),
"hdfs:///apps/hive/warehouse/hudi.db/hudi_test_cdc_01");
options.put(FlinkOptions.TABLE_TYPE.key(),
HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.DATABASE_NAME.key(), "hudi");
options.put(FlinkOptions.TABLE_NAME.key(), "hudi_test_cdc_01");
options.put(FlinkOptions.PRE_COMBINE.key(), "true");
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "_flink_cdc_ts_ms");
options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
options.put(FlinkOptions.INDEX_TYPE.key(),
HoodieIndex.IndexType.BUCKET.name());
options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(),
bucketAssignTasks);
options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "128");
options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(),
"CONSISTENT_HASHING");
options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(),
"num_or_time");
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "5");
options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "150");
options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
options.put(FlinkOptions.HIVE_SYNC_DB.key(), "ods");
options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), "hive_test_cdc_01");
options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(),
connectInfo.get("hive_metastore_url"));
options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(),
connectInfo.get("conn_url"));
options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");
options.put(FlinkOptions.PARTITION_PATH_FIELD.key(),
"_flink_cdc_table");
options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(),
"_flink_cdc_table");
options.put(FlinkOptions.WRITE_RATE_LIMIT.key(), "20000");
options.put(FlinkOptions.OPERATION.key(),
WriteOperationType.UPSERT.value());
builder.options(options);
return builder;
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]