lengkristy opened a new issue, #8636:
URL: https://github.com/apache/iceberg/issues/8636
### Query engine
java SDK API
code:
package com.harman.iceberg;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.harman.ProcessKinesisRecords;
import com.harman.conf.LambdaConfig;
import com.harman.model.PipelineData;
import com.harman.util.ExceptionUtil;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.Table;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitionedFanoutWriter;
import java.util.*;
public class IcebergWriter {
//lambda config
private LambdaConfig lambdaConfig;
private LambdaLogger logger = null;
private Random random = new Random();
private final static Long TARGET_FILE_SIZE_IN_BYTES = 1000000L;
private final static String CATALOG = "glue_catalog";
private Table table = null;
public IcebergWriter(LambdaConfig lambdaConfig, LambdaLogger logger) {
this.lambdaConfig = lambdaConfig;
this.logger = logger;
}
//save to iceberg
public void save(List<PipelineData> records) throws Exception {
if (records == null || records.size() == 0) {
return;
}
//init table
Table table = null;
GenericAppenderFactory appenderFactory = null;
OutputFileFactory outputFileFactory = null;
try {
table = loadTable();
appenderFactory = new GenericAppenderFactory(table.schema());
int partitionId = 1 + random.nextInt(1000);
long taskId = System.currentTimeMillis();
outputFileFactory = OutputFileFactory.builderFor(table,
partitionId, taskId).format(FileFormat.PARQUET).build();
} catch (Exception e) {
System.out.println("get table error:" + e.getMessage() +
ExceptionUtil.convertStackToString(e));
if (logger != null) {
logger.log("can not init iceberg table :");
logger.log(ExceptionUtil.convertStackToString(e));
}
throw e;
}
//write data
PartitionedFanoutWriter<Record> partitionedFanoutWriter = null;
try {
PartitionKey partitionKey = new PartitionKey(table.spec(),
table.spec().schema());
InternalRecordWrapper recordWrapper = new
InternalRecordWrapper(table.schema().asStruct());
// partitionedFanoutWriter will auto partitioned record and
create the partitioned writer
partitionedFanoutWriter = new
PartitionedFanoutWriter<Record>(table.spec(), FileFormat.PARQUET,
appenderFactory, outputFileFactory, table.io(), TARGET_FILE_SIZE_IN_BYTES) {
@Override
protected PartitionKey partition(Record record) {
try {
partitionKey.partition(recordWrapper.wrap(record));
}catch (Exception e){
System.out.println("write partition data error:" +
e.getMessage() + ExceptionUtil.convertStackToString(e));
}
return partitionKey;
}
};
for (PipelineData data : records) {
try {
if (data.getRecord() != null)
partitionedFanoutWriter.write(data.getRecord());
} catch (Exception e) {
System.out.println("write data error:" + e.getMessage()
+ ExceptionUtil.convertStackToString(e));
if (logger != null) {
logger.log("can not write iceberg table :" +
data.getRecord().toString());
logger.log(ExceptionUtil.convertStackToString(e));
//record the data
logger.log(records.toString());
}
ProcessKinesisRecords.errRecords.add(data);
}
}
} catch (Exception e) {
//
System.out.println("write data error:" + e.getMessage() +
ExceptionUtil.convertStackToString(e));
if (logger != null) {
logger.log("can not write data to iceberg table:");
logger.log(ExceptionUtil.convertStackToString(e));
}
throw e;
}finally {
if (partitionedFanoutWriter != null){
partitionedFanoutWriter.close();
}
}
//commit data
try {
System.out.println("start commit file,file size:" +
partitionedFanoutWriter.dataFiles()[0].recordCount());
AppendFiles appendFiles = table.newAppend();
// submit datafiles to the table
Arrays.stream(partitionedFanoutWriter.dataFiles()).forEach(appendFiles::appendFile);
// submit snapshot
appendFiles.apply();
appendFiles.commit();
table.rewriteManifests();
System.out.println("committed file");
} catch (Throwable e) {
if (logger != null) {
logger.log("ERROR: can not commit data to iceberg table:" +
e.getMessage());
}
System.out.println("can not commit");
ProcessKinesisRecords.errRecords.addAll(records);
e.printStackTrace();
throw e;
}
}
public synchronized Table loadTable() {
if (this.table == null) {
Catalog catalog = null;
//init table
Table table = null;
try {
if
(CATALOG.equalsIgnoreCase(this.lambdaConfig.getCatalogName()))
catalog = new GlueCatalog();
if (catalog == null)
return null;
Map<String, String> properties = new HashMap<String,
String>();
properties.put("warehouse",
this.lambdaConfig.getWarehouseLocation());
properties.put("lock-impl","org.apache.iceberg.aws.dynamodb.DynamoDbLockManager");
properties.put("lock.table","ota_fact");
catalog.initialize(this.lambdaConfig.getCatalogName(),
properties);
TableIdentifier name =
TableIdentifier.of(this.lambdaConfig.getDbName(),
this.lambdaConfig.getFactTableName());
table = catalog.loadTable(name);
} catch (Exception e) {
throw e;
}
this.table = table;
}
return this.table;
}
}
### Question
I use aws lambda concurrent write iceberg by java SDK, the data is from
kinesis, and total 4 concurrent, and I used the dynamodb lock.
but I get some error:
1. Caused by:
software.amazon.awssdk.services.glue.model.ConcurrentModificationException:
Update table failed due to concurrent modifications. (Service: Glue, Status
Code: 400, Request ID: 2de439bb-065b-4fb0-84f7-0079e1073f3b)

2.org.apache.iceberg.exceptions.CommitFailedException: Cannot commit
glue_catalog.gu_iot_dev_eu_one_platform_result_9b02b87.fact_ota_data because
base metadata location
's3://s3-iot-dev-eu-analytics-result-eu-west-1-7c87510/fact_ota_data/metadata/01974-48d83c2d-5e87-493f-a738-e38d4b2e0f3c.metadata.json'
is not same as the current Glue location
's3://s3-iot-dev-eu-analytics-result-eu-west-1-7c87510/fact_ota_data/metadata/01975-f7e5d86f-a1e4-4316-9705-02ec4bb7145d.metadata.json

--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]