Hi Iceberg Community,

I'm running some experiments with high commit contention (on the same Iceberg 
table writing to different partitions) and I'm observing very high memory usage 
(5G to 7G). (Note that the data being written is very small.)

The scenario is described below:

Note1: The catalog used is similar to the JDBC catalog.
Note2: The data is stored on S3 and HadoopFileSystem is used to talk to S3.
Note3: Iceberg code is ~6 months old. I haven't tried the latest main branch.

Experiment params:
a. NT = 64 = number of parallel committers. Achieved using multiple threads 
within the same process.
b. minWait = COMMIT_MIN_RETRY_WAIT_MS
c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure enough 
retries are done so that all committers finish successfully).

Steps:

* Create an Iceberg table with three columns: time (timestamp without 
timezone), id (int32), value (float64). The partition spec is (time, MONTH).
* Sequential step: create NT different AppendFile objects.
* Sequential write step: for 1 to NT, write 1 row (in a unique month) and 
append the DataFile to the corresponding AppendFile. Basically, we create one 
parquet file per month (i.e. per partition) containing a single row. This is 
done to keep data size small for the experiment. Also, we ensure that each 
commit will contain a different partition.
* Parallel commit step: Create a ThreadPool of NT threads, submit a Runnable 
which calls AppendFile.commit(), and get the Future. I.e. Run the commits in 
parallel.
* Wait for all Futures to finish.
I ran this experiment with various values for params. For example, I varied 
minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in (8, 16, 32, 
64, 128). Code snippets can be found below.

Observations:
A. Total elapsed commit time increases with the number of committers which is 
expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total elapsed 
commit time is more than 250 s. This is acceptable given the nature of OCC in 
high concurrency.
B. The number of table metadata files is a multiple of the number of 
committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table 
metadata json files was 380. This is acceptable given the nature of OCC in high 
concurrency.
C. The memory usage which keeps shooting-up  periodically to 7G in some 
experiments. This is noticeable (i.e. memory usage > 1G) when number of 
concurrent committers >= 16 and becomes worse when number of committers 
increase. I've not investigated further but it could be that the in-memory 
metadata (snapshots, etc.) is growing very large. If I serialize the commit 
attempts (e.g. by acquiring a lock), the high memory usage problem goes away. 
But, I wanted to check here before trying out any alternative.

Why is the concurrent commit important to us?
We have several users who use various processing engines to schedule their 
writes (into non-overlapping partitions) through a data service that takes care 
of writing and committing the data. In many cases, they end up in the high 
commit contention scenario as described above. My main worry here is that this 
is happening for a single table, if we have multiple tables being committed, 
the memory usage will be much larger.

Questions:

1.      Have others observed this behavior? Is the high memory usage expected 
or am I doing something wrong? Is there any way to reduce the memory footprint 
(e.g. by changing some metadata config) during the commit?

2.      What is the general recommendation for high concurrent committers? Is 
high concurrent committers an anti-pattern for Iceberg?

Thanks,
Mayur
Code snippets:

Schema schema = new Schema(
    NestedField.of(1, false, "time", TimestampType.withoutZone()),
    NestedField.of(2, false, "id", IntegerType.get()),
    NestedField.of(3, false, "value", DoubleType.get())
);

catalog.createTable(
    tableIdentifier,
    schema,
    PartitionSpec.builderFor(schema).month("time").build(),
    ImmutableMap.of(
        TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
        TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
        TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
        "write.metadata.previous-versions-max", String.valueOf(1)
    ) // properties
);

// Write data phase.
List<AppendFiles> appendFilesList = new ArrayList<>();
for (int m = 0; m < NT; m++) {
    appendFilesList.add(table.newAppend());
}

for (int m = 0; m < NT; m++) {
    LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m, 
ChronoUnit.MONTHS);
    ImmutableList<GenericRecord> records = 
ImmutableList.of(createRecord(schema, time, 1, 10.0));
    writeRecords(table, records).forEach(appendFilesList.get(m)::appendFile);
}

// Commit phase.
// High memory usage starts from the commit phase.
ExecutorService executors = Executors.newFixedThreadPool(NT);
List<Future<?>> futures = new ArrayList<>();
for (int m = 0; m < NT; m++) {
    final int i = m;
    futures.add(executors.submit(() -> {
        appendFilesList.get(i).commit();
    }));
}

for (int m = 0; m < N; m++) {
    futures.get(m).get();
}

executors.shutdownNow();

// snippet of writeRecords().
private static List<DataFile> writeRecords(Table table, List<GenericRecord> 
records)
        throws IOException {
    // PartitionedWriterImpl extends extends PartitionedWriter<Record>
    try (var writer = new PartitionedWriterImpl(table)) {
        for (var record : records) {
            writer.write(record);
        }
        return Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
    }
}

Following is the heap usage for one of the experiments where we can see very 
high heap usage. The initial low usage part is data writes. The high heap usage 
starts with the commit phase.
[cid:image001.jpg@01D7E5FF.AA6361E0]

Reply via email to