harsha27-ops opened a new issue, #11633:
URL: https://github.com/apache/hudi/issues/11633
package com.amazon.cosmos.transactionaldatalake.kda.common.sink;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.table.HoodieFlinkTable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
public class HudiSinkFunction extends
RichSinkFunction<HoodieRecord<HoodieAvroPayload>> {
public static String TRIP_EXAMPLE_SCHEMA = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"triprec\",\n" +
" \"fields\": [\n" +
" {\"name\": \"ts\", \"type\": \"long\"},\n" +
" {\"name\": \"uuid\", \"type\": \"string\"},\n" +
" {\"name\": \"path\", \"type\": \"string\"},\n" +
" {\"name\": \"driver\", \"type\": \"string\"},\n" +
" {\"name\": \"begin_lat\", \"type\": \"double\"},\n" +
" {\"name\": \"begin_lon\", \"type\": \"double\"},\n" +
" {\"name\": \"end_lat\", \"type\": \"double\"},\n" +
" {\"name\": \"end_lon\", \"type\": \"double\"},\n" +
" {\"name\": \"fare\", \"type\": \"double\"}\n" +
" ]\n" +
"}";
private transient HoodieFlinkWriteClient<HoodieAvroPayload> writeClient;
private final String tablePath = "s3a://testkaharshd4/test";
private final String tableName = "kaharshd_test";
@Override
public void open(org.apache.flink.configuration.Configuration
parameters) throws Exception {
super.open(parameters);
org.apache.hadoop.conf.Configuration hadoopConf = new
org.apache.hadoop.conf.Configuration();
hadoopConf.set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
hadoopConf.set("fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
hadoopConf.set("fs.defaultFS", "s3a://testkaharshd4");
hadoopConf.set("fs.s3a.endpoint", "s3.amazonaws.com");
StorageConfiguration<?> storageConf =
HadoopFSUtils.getStorageConf(hadoopConf);
// Initialize the table if not already done
Path path = new Path(this.tablePath);
FileSystem fs = HadoopFSUtils.getFs(this.tablePath, storageConf);
if (!fs.exists(path)) {
LOG.info("sdjbchdsbchbdhb");
HoodieTableMetaClient.withPropertyBuilder().setTableType(HoodieTableType.MERGE_ON_READ.name()).setTableName(this.tableName)
.setPayloadClassName(HoodieAvroPayload.class.getName()).initTable(storageConf,
this.tablePath);
}
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
.withPath(this.tablePath)
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withDeleteParallelism(2)
.forTable(this.tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder()
.withIndexType(IndexType.SIMPLE)
.withBucketIndexEngineType(BucketIndexEngineType.SIMPLE)
.build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.archiveCommitsWith(20, 30)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(true)
.build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.parquetMaxFileSize(120 * 1024 *
1024)
.parquetBlockSize(120 * 1024 *
1024)
.parquetPageSize(1 * 1024 * 1024)
.build())
.build();
System.out.println(cfg.getRecordMerger().getRecordType());
LOG.info("s,ncjds" + cfg.getRecordMerger().getRecordType());
this.writeClient = new HoodieFlinkWriteClient<>(new
HoodieFlinkEngineContext(hadoopConf), cfg);
}
@Override
public void invoke(HoodieRecord<HoodieAvroPayload> value, Context
context) throws Exception {
LOG.info("cjjddncj" + this.writeClient.getConfig().getProps());
LOG.info("cjjddncj" +
FSUtils.getFileExtension(value.getPartitionPath()));
String newCommitTime = this.writeClient.startCommit();
List<HoodieRecord<HoodieAvroPayload>> records = new ArrayList<>();
records.add(value);
List<HoodieRecord<HoodieAvroPayload>> writeRecords =
records.stream().map(r -> new HoodieAvroRecord<>(r.getKey(),
r.getData())).collect(Collectors.toList());
HoodieFlinkTable table = this.writeClient.getHoodieTable();
this.writeClient.getIndex().tagLocation(HoodieListData.eager(writeRecords),
this.writeClient.getEngineContext(), table);
this.writeClient.insert(writeRecords, newCommitTime);
}
@Override
public void close() throws Exception {
if (this.writeClient != null) {
this.writeClient.close();
}
super.close();
}
}
I am trying to run this code but its failing with
with failure cause: java.lang.NullPointerException\n\tat
org.apache.hudi.io.FlinkWriteHandleFactory$BaseCommitWriteHandleFactory.create(FlinkWriteHandleFactory.java:107)\n\tat
org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:459)\n\tat
org.apache.hudi.client.HoodieFlinkWriteClient.access$000(HoodieFlinkWriteClient.java:76)\n\tat
org.apache.hudi.client.HoodieFlinkWriteClient$AutoCloseableWriteHandle.\u003cinit\u003e(HoodieFlinkWriteClient.java:515)\n\tat
org.apache.hudi.client.HoodieFlinkWriteClient$AutoCloseableWriteHandle.\u003cinit\u003e(HoodieFlinkWriteClient.java:507)\n\tat
org.apache.hudi.client.HoodieFlinkWriteClient.insert(HoodieFlinkWriteClient.java:182)\n\tat
com.amazon.cosmos.transactionaldatalake.kda.common.sink.HudiSinkFunction.invoke(HudiSinkFunction.java:101)\n\tat
com.amazon.cosmos.transactionaldatalake.kda.common.sink.HudiSinkFunction.invoke(HudiSinkFunction.java:31)\n\tat
org.apache.flink.streaming.api.operators.Stre
amSink.processElement(StreamSink.java:54)\n\tat
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)\n\tat
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)\n\tat
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)\n\tat
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)\n\tat
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)\n\tat
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)\n\tat
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)\n\tat
org.apac
he.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)\n\tat
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)\n\tat
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:423)\n\tat
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:528)\n\tat
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collectWithTimestamp(StreamSourceContexts.java:108)\n\tat
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:1028)\n\tat
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:113)\n\tat
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:315
)\n\tat
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:332)\n\tat
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:329)\n\tat
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:1012)\n\tat
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:219)\n\tat
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.lambda$run$0(ShardConsumer.java:126)\n\tat
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:118)\n\tat
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:102)\n\tat
org.apache.flink.streaming.connec
tors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)\n\tat
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat
java.base/java.util.concu
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]