waytoharish commented on issue #10895:
URL: https://github.com/apache/hudi/issues/10895#issuecomment-2014848588

   Thanks for your time @ad1happy2go 
    @ad1happy2go @danny0405 Here is the error which I am getting after the code 
change :
    
    14:38:06,411 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   
    [] - Source: Kafka Source -> (Map -> row_data_to_hoodie_record, Sink: Print 
to Std. Out) (6/10) (b41f7215661da4d1d3d1c157a58c57e4) switched from RUNNING to 
FAILED on 8d293143-6737-4451-a2c5-4e4f6f85cbd4 @ localhost (dataPort=-1).
   java.io.IOException: Failed to deserialize consumer record due to
        at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
        at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
 ~[flink-connector-base-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 ~[flink-runtime-1.15.0.jar:1.15.0]
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-runtime-1.15.0.jar:1.15.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
~[flink-runtime-1.15.0.jar:1.15.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
~[flink-runtime-1.15.0.jar:1.15.0]
        at java.lang.Thread.run(Thread.java:829) ~[?:?]
   Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
 ~[flink-core-1.15.0.jar:1.15.0]
        at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
        at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
        ... 14 more
   Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
 ~[flink-core-1.15.0.jar:1.15.0]
        at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
        at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
        ... 14 more
   Caused by: java.lang.IndexOutOfBoundsException
        at 
org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:787) 
~[flink-core-1.15.0.jar:1.15.0]
        at 
org.apache.flink.table.data.binary.BinarySegmentUtils.getInt(BinarySegmentUtils.java:633)
 ~[flink-table-common-1.15.0.jar:1.15.0]
        at 
org.apache.flink.table.data.binary.BinaryArrayData.pointTo(BinaryArrayData.java:143)
 ~[flink-table-common-1.15.0.jar:1.15.0]
        at 
org.apache.flink.table.data.binary.BinarySegmentUtils.readArrayData(BinarySegmentUtils.java:1110)
 ~[flink-table-common-1.15.0.jar:1.15.0]
        at 
org.apache.flink.table.data.binary.BinaryRowData.getArray(BinaryRowData.java:376)
 ~[flink-table-common-1.15.0.jar:1.15.0]
        at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$10(RowData.java:265)
 ~[flink-table-common-1.15.0.jar:1.15.0]
        at 
org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
 ~[flink-table-common-1.15.0.jar:1.15.0]
        at 
org.apache.hudi.util.RowDataToAvroConverters$11.convert(RowDataToAvroConverters.java:271)
 ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
        at 
org.apache.hudi.util.RowDataToAvroConverters$10.convert(RowDataToAvroConverters.java:239)
 ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
        at 
org.apache.hudi.sink.transform.RowDataToHoodieFunction.toHoodieRecord(RowDataToHoodieFunction.java:109)
 ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
        at 
org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:97)
 ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
        at 
org.apache.hudi.sink.transform.RowDataToHoodieFunction.map(RowDataToHoodieFunction.java:46)
 ~[hudi-flink1.15-bundle-0.14.0.jar:0.14.0]
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-streaming-java-1.15.0.jar:1.15.0]
        at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
 ~[flink-core-1.15.0.jar:1.15.0]
        at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
        at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[flink-connector-kafka-1.15.0.jar:1.15.0]
        ... 14 more
        
        Here is the latest code :
        
        package com.hudi.flink.quickstart;
   
   import com.test.gen.Datum;
   import com.test.gen.Telemetry;
   import org.apache.flink.api.common.eventtime.WatermarkStrategy;
   import org.apache.flink.api.common.functions.MapFunction;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.connector.kafka.source.KafkaSource;
   import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
   import org.apache.flink.streaming.api.CheckpointingMode;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.CheckpointConfig;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.table.api.DataTypes;
   import org.apache.flink.table.data.*;
   import org.apache.flink.table.data.binary.BinaryArrayData;
   import org.apache.flink.table.data.binary.BinaryRowData;
   import org.apache.flink.table.data.writer.BinaryArrayWriter;
   import org.apache.flink.table.data.writer.BinaryRowWriter;
   import org.apache.flink.table.data.writer.BinaryWriter;
   import org.apache.flink.table.runtime.typeutils.ArrayDataSerializer;
   import org.apache.flink.table.runtime.typeutils.InternalSerializers;
   import org.apache.flink.table.types.DataType;
   import org.apache.flink.table.types.logical.LogicalType;
   import org.apache.flink.table.types.logical.RowType;
   import org.apache.hudi.common.model.HoodieTableType;
   import org.apache.hudi.configuration.FlinkOptions;
   import org.apache.hudi.util.HoodiePipeline;
   import java.util.HashMap;
   import java.util.Map;
   import java.util.Properties;
   
   import static org.apache.flink.table.api.DataTypes.ROW;
   
   /**
    * A Flink program that ingests data from Kafka and writes it to Apache Hudi.
    */
   public class Kafka2HudiPipelineNew {
   
   
       public static void main(String[] args) throws Exception {
   
           String pipelineName = "TestKafkaPipelIneTest";
           String hudiBasePath = "s3a://awshksharma/"+pipelineName;
           String kafkaGroupId = "myTest";
           String kafkaTopicName = "hksharma";
   
           // Create a Flink execution environment
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
            // Configure checkpointing
           configureCheckpointing(env);
   
           // Set up Kafka source
           KafkaSource<Telemetry> kafkaConsumer = 
createKafkaConsumer(kafkaTopicName, kafkaGroupId);
   
           // Create a Kafka stream
           DataStream<Telemetry> kafkaStream = env.fromSource(kafkaConsumer, 
WatermarkStrategy.noWatermarks(), "Kafka Source");
   
           // Transform Kafka data to Hudi records
           DataStream<RowData> transformedStream = kafkaStream.map(new 
HudiDataSource());
   
   
           // Define Hudi target table and options
           String targetTable = "hudi_hksharma_table_test";
           Map<String, String> options = createHudiOptions(hudiBasePath, 
targetTable);
   
           // Define HoodiePipeline.Builder for configuring the Hudi write
           HoodiePipeline.Builder builder = createHudiPipeline(targetTable, 
options);
   
   
           kafkaStream.print();
   
           // Write to Hudi
           builder.sink(transformedStream, false);
   
   
   
           env.execute("Api_Sink");
   
           // Execute the Flink job
           env.execute(pipelineName);
       }
   
       // Configure Flink checkpointing settings
       private static void configureCheckpointing(StreamExecutionEnvironment 
env) {
           env.enableCheckpointing(10000); // Checkpoint every 50 seconds
           CheckpointConfig checkpointConfig = env.getCheckpointConfig();
           
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
           checkpointConfig.setMinPauseBetweenCheckpoints(1000); // Minimum 
time between checkpoints
           checkpointConfig.setCheckpointTimeout(60000); // Checkpoint timeout 
in milliseconds
          // checkpointConfig.setCheckpointStorage(checkpointLocation);
       }
   
       // Create a Kafka consumer with specified properties
       private static KafkaSource<Telemetry> createKafkaConsumer(String 
topicName, String kafkaGroupId) {
           Properties properties = new Properties();
           properties.setProperty("bootstrap.servers", "localhost:9092");
           properties.setProperty("group.id", kafkaGroupId);
   
           return KafkaSource.<Telemetry>builder()
                   .setBootstrapServers("localhost:9092")
                   .setTopics("hksharma")
                   .setGroupId("flinkgroup")
                   .setProperties(properties)
                   .setStartingOffsets(OffsetsInitializer.latest())
                   .setValueOnlyDeserializer(new 
TelemetryEventDeserializationSchema())
                   .build();
       }
   
       // Create Hudi options for the data sink
       private static Map<String, String> createHudiOptions(String 
basePath,String tableName) {
           Map<String, String> options = new HashMap<>();
           options.put("path", basePath);
   
           options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
           options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
           options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "glue");
           options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName);
           options.put(FlinkOptions.HIVE_SYNC_DB.key(), "default");
   
   
           options.put(FlinkOptions.TABLE_TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
           options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
           options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
           return options;
       }
   
       // Create a HoodiePipeline.Builder with specified target table and 
options
       private static HoodiePipeline.Builder createHudiPipeline(String 
targetTable, Map<String, String> options) {
           return HoodiePipeline.builder(targetTable)
                   .column("uuid VARCHAR(256)")
                   .column("name VARCHAR(10)")
                   .column("age INT")
                   .column("ts TIMESTAMP(3)")
                   .column("data ARRAY<ROW<measure_name VARCHAR(256)>>")
                   .column("`partition` VARCHAR(20)")
                   .pk("uuid")
                   .partition("partition")
                   .options(options);
       }
   
       // Define the schema for Hudi records
   
       public static final DataType ROW_ARRAY_DATA_TYPE = 
ROW(DataTypes.FIELD("measure_name", DataTypes.VARCHAR(256))).notNull();
       public static final DataType ROW_DATA_TYPE = ROW(
                       DataTypes.FIELD("uuid", DataTypes.VARCHAR(256)), // 
record key
                       DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
                       DataTypes.FIELD("age", DataTypes.INT()),
                       DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
                       DataTypes.FIELD("partition", DataTypes.VARCHAR(10)),
                       
DataTypes.FIELD("data",DataTypes.ARRAY(ROW_ARRAY_DATA_TYPE))
              )
               .notNull();
   
   
   
   
       // Create a Hudi record from specified fields
   
       public static BinaryArrayData insertArrayRow() {
   
           BinaryArrayData array = new BinaryArrayData();
           BinaryArrayWriter arrayWriter =
                   new BinaryArrayWriter(
                           array,
                           1,
                           
BinaryArrayData.calculateFixLengthPartSize(DataTypes.STRING().getLogicalType()));
            arrayWriter.writeString(0,StringData.fromString("par1"));
            arrayWriter.complete();
           return array;
       }
       public static BinaryRowData insertRow(RowType rowType, Object... fields) 
{
   
           LogicalType[] types = 
rowType.getFields().stream().map(RowType.RowField::getType)
                   .toArray(LogicalType[]::new);
   
           BinaryRowData row = new BinaryRowData(fields.length);
   
           BinaryRowWriter writer = new BinaryRowWriter(row);
   
           writer.reset();
           for (int i = 0; i < fields.length; i++) {
               Object field = fields[i];
               System.out.println("field>>>>>>>>>>>>>"+field.toString());
               System.out.println("Type>>>>>>>>>>>>>"+types[i]);
   
               if (field == null) {
                   writer.setNullAt(i);
               } else if(field instanceof 
org.apache.flink.table.data.binary.BinaryArrayData){
                   BinaryWriter.write(writer, i, field, types[i], new 
ArrayDataSerializer(types[0]));
   
                   }
                   else {
                   BinaryWriter.write(writer, i, field, types[i], 
InternalSerializers.create(types[i]));
               }
           }
           writer.complete();
           return row;
       }
   
       // Overloaded method for creating a Hudi record using the default schema
       public static BinaryRowData insertRow(Object... fields) {
           return insertRow((RowType) ROW_DATA_TYPE.getLogicalType(), fields);
       }
   
       // Mapper to convert Kafka data to Hudi records
       static class HudiDataSource implements MapFunction<Telemetry, RowData> {
           @Override
           public RowData map(Telemetry kafkaRecord) throws Exception {
   
   
   
               return 
insertRow(StringData.fromString(kafkaRecord.getCampaignName()), 
StringData.fromString("Danny"), 23,
                       TimestampData.fromEpochMillis(1), 
StringData.fromString("par1"), insertArrayRow()
   
                      );
           }
       }
   }
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to