waytoharish commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2014848588
Thanks for your time @ad1happy2go
@ad1happy2go @danny0405 Here is the error which I am getting after the code
change :
14:38:06,411 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Source: Kafka Source -> (Map -> row_data_to_hoodie_record, Sink: Print
to Std. Out) (6/10) (b41f7215661da4d1d3d1c157a58c57e4) switched from RUNNING to
FAILED on 8d293143-6737-4451-a2c5-4e4f6f85cbd4 @ localhost (dataPort=-1).
java.io.IOException: Failed to deserialize consumer record due to
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
~[flink-connector-kafka-1.15.0.jar:1.15.0]
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
~[flink-connector-kafka-1.15.0.jar:1.15.0]
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
~[flink-connector-base-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
~[flink-runtime-1.15.0.jar:1.15.0]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
~[flink-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
~[flink-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
~[flink-runtime-1.15.0.jar:1.15.0]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
~[flink-connector-kafka-1.15.0.jar:1.15.0]
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
~[flink-core-1.15.0.jar:1.15.0]
at
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
~[flink-connector-kafka-1.15.0.jar:1.15.0]
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
~[flink-connector-kafka-1.15.0.jar:1.15.0]
... 14 more
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
~[flink-connector-kafka-1.15.0.jar:1.15.0]
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
~[flink-core-1.15.0.jar:1.15.0]
at
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
~[flink-connector-kafka-1.15.0.jar:1.15.0]
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
~[flink-connector-kafka-1.15.0.jar:1.15.0]
... 14 more
Caused by: java.lang.IndexOutOfBoundsException
at
org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:787)
~[flink-core-1.15.0.jar:1.15.0]
at
org.apache.flink.table.data.binary.BinarySegmentUtils.getInt(BinarySegmentUtils.java:633)
~[flink-table-common-1.15.0.jar:1.15.0]
at
org.apache.flink.table.data.binary.BinaryArrayData.pointTo(BinaryArrayData.java:143)
~[flink-table-common-1.15.0.jar:1.15.0]
at
org.apache.flink.table.data.binary.BinarySegmentUtils.readArrayData(BinarySegmentUtils.java:1110)
~[flink-table-common-1.15.0.jar:1.15.0]
at
org.apache.flink.table.data.binary.BinaryRowData.getArray(BinaryRowData.java:376)
~[flink-table-common-1.15.0.jar:1.15.0]
at
org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$10(RowData.java:265)
~[flink-table-common-1.15.0.jar:1.15.0]
at
org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
~[flink-table-common-1.15.0.jar:1.15.0]
at
org.apache.hudi.util.RowDataToAvroConverters$11.convert(RowDataToAvroConverters.java:271)
~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
at
org.apache.hudi.util.RowDataToAvroConverters$10.convert(RowDataToAvroConverters.java:239)
~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
at
org.apache.hudi.sink.transform.RowDataToHoodieFunction.toHoodieRecord(RowDataToHoodieFunction.java:109)
~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
at
org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:97)
~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
at
org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:46)
~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
~[flink-streaming-java-1.15.0.jar:1.15.0]
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
~[flink-connector-kafka-1.15.0.jar:1.15.0]
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
~[flink-core-1.15.0.jar:1.15.0]
at
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
~[flink-connector-kafka-1.15.0.jar:1.15.0]
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
~[flink-connector-kafka-1.15.0.jar:1.15.0]
... 14 more
Here is the latest code :
package com.hudi.flink.quickstart;
import com.test.gen.Datum;
import com.test.gen.Telemetry;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.*;
import org.apache.flink.table.data.binary.BinaryArrayData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryArrayWriter;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.data.writer.BinaryWriter;
import org.apache.flink.table.runtime.typeutils.ArrayDataSerializer;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.flink.table.api.DataTypes.ROW;
/**
* A Flink program that ingests data from Kafka and writes it to Apache Hudi.
*/
public class Kafka2HudiPipelineNew {
public static void main(String[] args) throws Exception {
String pipelineName = "TestKafkaPipelIneTest";
String hudiBasePath = "s3a://awshksharma/"+pipelineName;
String kafkaGroupId = "myTest";
String kafkaTopicName = "hksharma";
// Create a Flink execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Configure checkpointing
configureCheckpointing(env);
// Set up Kafka source
KafkaSource<Telemetry> kafkaConsumer =
createKafkaConsumer(kafkaTopicName, kafkaGroupId);
// Create a Kafka stream
DataStream<Telemetry> kafkaStream = env.fromSource(kafkaConsumer,
WatermarkStrategy.noWatermarks(), "Kafka Source");
// Transform Kafka data to Hudi records
DataStream<RowData> transformedStream = kafkaStream.map(new
HudiDataSource());
// Define Hudi target table and options
String targetTable = "hudi_hksharma_table_test";
Map<String, String> options = createHudiOptions(hudiBasePath,
targetTable);
// Define HoodiePipeline.Builder for configuring the Hudi write
HoodiePipeline.Builder builder = createHudiPipeline(targetTable,
options);
kafkaStream.print();
// Write to Hudi
builder.sink(transformedStream, false);
env.execute("Api_Sink");
// Execute the Flink job
env.execute(pipelineName);
}
// Configure Flink checkpointing settings
private static void configureCheckpointing(StreamExecutionEnvironment
env) {
env.enableCheckpointing(10000); // Checkpoint every 50 seconds
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.setMinPauseBetweenCheckpoints(1000); // Minimum
time between checkpoints
checkpointConfig.setCheckpointTimeout(60000); // Checkpoint timeout
in milliseconds
// checkpointConfig.setCheckpointStorage(checkpointLocation);
}
// Create a Kafka consumer with specified properties
private static KafkaSource<Telemetry> createKafkaConsumer(String
topicName, String kafkaGroupId) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", kafkaGroupId);
return KafkaSource.<Telemetry>builder()
.setBootstrapServers("localhost:9092")
.setTopics("hksharma")
.setGroupId("flinkgroup")
.setProperties(properties)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new
TelemetryEventDeserializationSchema())
.build();
}
// Create Hudi options for the data sink
private static Map<String, String> createHudiOptions(String
basePath,String tableName) {
Map<String, String> options = new HashMap<>();
options.put("path", basePath);
options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "glue");
options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName);
options.put(FlinkOptions.HIVE_SYNC_DB.key(), "default");
options.put(FlinkOptions.TABLE_TYPE.key(),
HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
return options;
}
// Create a HoodiePipeline.Builder with specified target table and
options
private static HoodiePipeline.Builder createHudiPipeline(String
targetTable, Map<String, String> options) {
return HoodiePipeline.builder(targetTable)
.column("uuid VARCHAR(256)")
.column("name VARCHAR(10)")
.column("age INT")
.column("ts TIMESTAMP(3)")
.column("data ARRAY<ROW<measure_name VARCHAR(256)>>")
.column("`partition` VARCHAR(20)")
.pk("uuid")
.partition("partition")
.options(options);
}
// Define the schema for Hudi records
public static final DataType ROW_ARRAY_DATA_TYPE =
ROW(DataTypes.FIELD("measure_name", DataTypes.VARCHAR(256))).notNull();
public static final DataType ROW_DATA_TYPE = ROW(
DataTypes.FIELD("uuid", DataTypes.VARCHAR(256)), //
record key
DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)),
DataTypes.FIELD("data",DataTypes.ARRAY(ROW_ARRAY_DATA_TYPE))
)
.notNull();
// Create a Hudi record from specified fields
public static BinaryArrayData insertArrayRow() {
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter arrayWriter =
new BinaryArrayWriter(
array,
1,
BinaryArrayData.calculateFixLengthPartSize(DataTypes.STRING().getLogicalType()));
arrayWriter.writeString(0,StringData.fromString("par1"));
arrayWriter.complete();
return array;
}
public static BinaryRowData insertRow(RowType rowType, Object... fields)
{
LogicalType[] types =
rowType.getFields().stream().map(RowType.RowField::getType)
.toArray(LogicalType[]::new);
BinaryRowData row = new BinaryRowData(fields.length);
BinaryRowWriter writer = new BinaryRowWriter(row);
writer.reset();
for (int i = 0; i < fields.length; i++) {
Object field = fields[i];
System.out.println("field>>>>>>>>>>>>>"+field.toString());
System.out.println("Type>>>>>>>>>>>>>"+types[i]);
if (field == null) {
writer.setNullAt(i);
} else if(field instanceof
org.apache.flink.table.data.binary.BinaryArrayData){
BinaryWriter.write(writer, i, field, types[i], new
ArrayDataSerializer(types[0]));
}
else {
BinaryWriter.write(writer, i, field, types[i],
InternalSerializers.create(types[i]));
}
}
writer.complete();
return row;
}
// Overloaded method for creating a Hudi record using the default schema
public static BinaryRowData insertRow(Object... fields) {
return insertRow((RowType) ROW_DATA_TYPE.getLogicalType(), fields);
}
// Mapper to convert Kafka data to Hudi records
static class HudiDataSource implements MapFunction<Telemetry, RowData> {
@Override
public RowData map(Telemetry kafkaRecord) throws Exception {
return
insertRow(StringData.fromString(kafkaRecord.getCampaignName()),
StringData.fromString("Danny"), 23,
TimestampData.fromEpochMillis(1),
StringData.fromString("par1"), insertArrayRow()
);
}
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]