waytoharish opened a new issue, #10859:
URL: https://github.com/apache/hudi/issues/10859
Hi Team,
I am trying to use the HudiDataStreamWriter file to load the data to AWS
Glue. I can see data is successfully loaded to S3 but Glue table is not
created. Please can someone help me what could be the reason as I can not see
any error in the code
Here is my config
`package com.hudi.flink.quickstart;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.data.writer.BinaryWriter;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.GlueCatalogSyncClientConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* This Flink program serves as a demonstration of inserting, updating, and
deleting records in a Hudi table using the DataStream API.
* The program inserts three messages for ten batches. Two of the messages
generate a random UUID, acting as new insert records, while one record reuses
the same record key, resulting in an update for that record in each batch.
* In the first batch, three new records are inserted into a newly created
Hudi table.
* Subsequently, after each batch, two new records are inserted, leading to
an increment in the count by two with each batch.
* In the 11th batch, we illustrate the delete operation by publishing a
record with the delete row kind. As a result, we observe the deletion of the
third ID after this batch.
*
* After this code finishes you should see total 20 records in hudi table.
*/
public class HudiDataStreamWriter {
/**
* Main Entry point takes two parameters.
* It can be run with Flink cli.
* Sample Command - bin/flink run -c
com.hudi.flink.quickstart.HudiDataStreamWriter
${HUDI_EXAMPLES_REPO}//flink/target/hudi-examples-0.1.jar hudi_table
"file:///tmp/hudi_table"
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
String pipelineName = "TestKafkaPipelIne";
String targetTable = "hudiashishtable";
String basePath = "s3a://awshksharma/"+targetTable;
String kafkaGroupId = "myTest";
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing
configureCheckpointing(env);
Map<String, String> options = createHudiOptions(basePath);
DataStreamSource<RowData> dataStream = env.addSource(new
SampleDataSource());
HoodiePipeline.Builder builder =
createHudiPipeline(targetTable, options);
builder.sink(dataStream, false); // The second parameter
indicates whether the input data stream is bounded
env.execute("Api_Sink");
}
/**
* Configure Flink checkpointing settings.
*
* @param env The Flink StreamExecutionEnvironment.
*/
private static void configureCheckpointing(StreamExecutionEnvironment
env) {
env.enableCheckpointing(5000); // Checkpoint every 5 seconds
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.setMinPauseBetweenCheckpoints(10000); //
Minimum time between checkpoints
checkpointConfig.setCheckpointTimeout(60000); // Checkpoint
timeout in milliseconds
//checkpointConfig.setCheckpointStorage("s3a://awshksharma/test/hudi_flink_checkpoint_2");
}
/**
* Create Hudi options for the data sink.
*
* @param basePath The base path for Hudi data.
* @return A Map containing Hudi options.
*/
private static Map<String, String> createHudiOptions(String basePath) {
Map<String, String> options = new HashMap<>();
/*options.put(FlinkOptions.PATH.key(), basePath);
options.put(FlinkOptions.TABLE_TYPE.key(),
HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.DATABASE_NAME.key(), "huditestdb");
options.put(FlinkOptions.TABLE_NAME.key(), "hudiashishtable");
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
options.put(FlinkOptions.IGNORE_FAILED.key(), "true");
options.put("hoodie.datasource.hive_sync.database",
"huditestdb");
options.put("hoodie.datasource.hive_sync.table",
"hudiashishtable");
options.put("hoodie.datasource.hive_sync.mode", "hms");
options.put("hoodie.datasource.hive_sync.enable", "true");
options.put("hoodie.meta.sync.client.tool.class",
"org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool");
*/
options.put("hoodie.table.name", "customer_sample_hudi_001");
options.put("path", "s3a://test");
options.put("hoodie.datasource.write.storage.type",
"COPY_ON_WRITE");
options.put("hoodie.datasource.write.recordkey.field", "uuid");
options.put("hoodie.datasource.write.partitionpath.field",
"uuid");
options.put("hoodie.datasource.write.table.name",
"customer_sample_hudi_001");
options.put("hoodie.datasource.write.operation",
"insert_overwrite");
//options.put("hoodie.datasource.write.precombine.field",
"load_timestamp");
options.put("hoodie.datasource.write.hive_style_partitioning",
"true");
options.put("hoodie.upsert.shuffle.parallelism", "2");
options.put("hoodie.insert.shuffle.parallelism", "2");
options.put("hoodie.datasource.hive_sync.mode", "hms");
options.put("hoodie.datasource.hive_sync.enable", "true");
options.put("hoodie.datasource.hive_sync.database", "default");
options.put("hoodie.datasource.hive_sync.table",
"customer_sample_hudi_001");
options.put("hoodie.datasource.hive_sync.partition_fields",
"uuid");
options.put("hoodie.datasource.hive_sync.partition_extractor_class",
"org.apache.hudi.hive.MultiPartKeysValueExtractor");
options.put("hoodie.datasource.hive_sync.use_jdbc", "false");
options.put("hoodie.datasource.hive_sync.ignore_exceptions",
"True");
options.put("hoodie.meta.sync.client.tool.class",
"org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool");
return options;
}
/**
* Create a HudiPipeline.Builder with the specified target table and
options.
*
* @param targetTable The name of the Hudi table.
* @param options The Hudi options for the data sink.
* @return A HudiPipeline.Builder.
*/
private static HoodiePipeline.Builder createHudiPipeline(String
targetTable, Map<String, String> options) {
return HoodiePipeline.builder("customer_sample_hudi_001")
.column("ts TIMESTAMP(3)")
.column("uuid VARCHAR(40)")
.column("rider VARCHAR(20)")
.column("driver VARCHAR(20)")
.column("fare DOUBLE")
.column("city VARCHAR(20)")
.pk("uuid")
.partition("city")
.options(options);
}
/**
* Sample data source for generating RowData objects.
*/
static class SampleDataSource implements SourceFunction<RowData> {
private volatile boolean isRunning = true;
public static BinaryRowData insertRow(Object... fields) {
DataType ROW_DATA_TYPE = DataTypes.ROW(
DataTypes.FIELD("ts",
DataTypes.TIMESTAMP(3)), // precombine field
DataTypes.FIELD("uuid",
DataTypes.VARCHAR(40)),// record key
DataTypes.FIELD("rider", DataTypes.VARCHAR(20)),
DataTypes.FIELD("driver", DataTypes.VARCHAR(20)),
DataTypes.FIELD("fare",
DataTypes.DOUBLE()),
DataTypes.FIELD("city",
DataTypes.VARCHAR(20)))
.notNull();
RowType ROW_TYPE = (RowType)
ROW_DATA_TYPE.getLogicalType();
LogicalType[] types =
ROW_TYPE.getFields().stream().map(RowType.RowField::getType)
.toArray(LogicalType[]::new);
BinaryRowData row = new BinaryRowData(fields.length);
BinaryRowWriter writer = new BinaryRowWriter(row);
writer.reset();
for (int i = 0; i < fields.length; i++) {
Object field = fields[i];
if (field == null) {
writer.setNullAt(i);
} else {
BinaryWriter.write(writer, i, field,
types[i], InternalSerializers.create(types[i]));
}
}
writer.complete();
return row;
}
@Override
public void run(SourceContext<RowData> ctx) throws Exception {
int batchNum = 0;
while (isRunning) {
batchNum ++;
// For Every Batch, it adds two new rows with
RANDOM uuid and updates the row with uuid "334e26e9-8355-45cc-97c6-c31daf0df330"
List<RowData> DATA_SET_INSERT = Arrays.asList(
insertRow(TimestampData.fromEpochMillis(1695159649),
StringData.fromString(UUID.randomUUID().toString()),
StringData.fromString("rider-A"),
StringData.fromString("driver-K"), 19.10,
StringData.fromString("san_francisco")),
insertRow(TimestampData.fromEpochMillis(1695159649),
StringData.fromString(UUID.randomUUID().toString()),
StringData.fromString("rider-B"),
StringData.fromString("driver-M"), 27.70,
StringData.fromString("san_francisco")),
insertRow(TimestampData.fromEpochMillis(1695159649),
StringData.fromString("334e26e9-8355-45cc-97c6-c31daf0df330"),
StringData.fromString("rider-C"),
StringData.fromString("driver-L"), 33.90,
StringData.fromString("san_francisco"))
);
if(batchNum < 11) {
// For first 10 batches, inserting 3
records. 2 with random id (INSERTS) and 1 with hardcoded UUID(UPDATE)
for (RowData row : DATA_SET_INSERT) {
ctx.collect(row);
}
}else{
// For 11th Batch, inserting only one
record with row kind delete.
RowData rowToBeDeleted =
DATA_SET_INSERT.get(2);
rowToBeDeleted.setRowKind(RowKind.DELETE);
ctx.collect(rowToBeDeleted);
// Stop the stream once deleted
isRunning = false;
}
TimeUnit.MILLISECONDS.sleep(10000); // Simulate
a delay
}
}
@Override
public void cancel() {
isRunning = false;
}
}
}
`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]