lengkristy commented on issue #8636:
URL: https://github.com/apache/iceberg/issues/8636#issuecomment-1734696190
@nastra Sure, this is gradle dependencies:
implementation 'org.apache.iceberg:iceberg-core:1.3.1'
implementation 'org.apache.iceberg:iceberg-parquet:1.3.1'
implementation 'org.apache.iceberg:iceberg-data:1.3.1'
implementation 'org.apache.iceberg:iceberg-hive-runtime:1.3.1'
implementation ('org.apache.hadoop:hadoop-aws:3.3.0'){
exclude(group:'com.amazonaws',module: 'aws-java-sdk-bundle')
}
implementation 'org.apache.hadoop:hadoop-common:3.3.0'
implementation 'software.amazon.awssdk:s3:2.20.131'
implementation 'software.amazon.awssdk:glue:2.17.122'
implementation 'software.amazon.awssdk:sts:2.17.122'
implementation 'software.amazon.awssdk:dynamodb:2.17.122'
and write iceberg table data with 4 concurrent:
below code:
//write data
PartitionedFanoutWriter<Record> partitionedFanoutWriter = null;
try {
PartitionKey partitionKey = new PartitionKey(table.spec(),
table.spec().schema());
InternalRecordWrapper recordWrapper = new
InternalRecordWrapper(table.schema().asStruct());
// partitionedFanoutWriter will auto partitioned record and
create the partitioned writer
partitionedFanoutWriter = new
PartitionedFanoutWriter<Record>(table.spec(), FileFormat.PARQUET,
appenderFactory, outputFileFactory, table.io(), TARGET_FILE_SIZE_IN_BYTES) {
@Override
protected PartitionKey partition(Record record) {
try {
partitionKey.partition(recordWrapper.wrap(record));
}catch (Exception e){
System.out.println("write partition data error:" +
e.getMessage() + ExceptionUtil.convertStackToString(e));
}
return partitionKey;
}
};
for (PipelineData data : records) {
try {
if (data.getRecord() != null)
partitionedFanoutWriter.write(data.getRecord());
} catch (Exception e) {
System.out.println("write data error:" + e.getMessage()
+ ExceptionUtil.convertStackToString(e));
if (logger != null) {
logger.log("can not write iceberg table :" +
data.getRecord().toString());
logger.log(ExceptionUtil.convertStackToString(e));
//record the data
logger.log(records.toString());
}
ProcessKinesisRecords.errRecords.add(data);
}
}
} catch (Exception e) {
//
System.out.println("write data error:" + e.getMessage() +
ExceptionUtil.convertStackToString(e));
if (logger != null) {
logger.log("can not write data to iceberg table:");
logger.log(ExceptionUtil.convertStackToString(e));
}
throw e;
}finally {
if (partitionedFanoutWriter != null){
partitionedFanoutWriter.close();
}
}
//commit data
try {
System.out.println("start commit file,file size:" +
partitionedFanoutWriter.dataFiles()[0].recordCount());
AppendFiles appendFiles = table.newAppend();
// submit datafiles to the table
Arrays.stream(partitionedFanoutWriter.dataFiles()).forEach(appendFiles::appendFile);
// submit snapshot
appendFiles.apply();
appendFiles.commit();
System.out.println("committed file");
} catch (Throwable e) {
if (logger != null) {
logger.log("ERROR: can not commit data to iceberg table:" +
e.getMessage());
}
System.out.println("can not commit");
ProcessKinesisRecords.errRecords.addAll(records);
e.printStackTrace();
throw e;
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]