Hi Iceberg Community,
I'm running some experiments with high commit contention (on the same Iceberg
table writing to different partitions) and I'm observing very high memory usage
(5G to 7G). (Note that the data being written is very small.)
The scenario is described below:
Note1: The catalog used is similar to the JDBC catalog.
Note2: The data is stored on S3 and HadoopFileSystem is used to talk to S3.
Note3: Iceberg code is ~6 months old. I haven't tried the latest main branch.
Experiment params:
a. NT = 64 = number of parallel committers. Achieved using multiple threads
within the same process.
b. minWait = COMMIT_MIN_RETRY_WAIT_MS
c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure enough
retries are done so that all committers finish successfully).
Steps:
* Create an Iceberg table with three columns: time (timestamp without
timezone), id (int32), value (float64). The partition spec is (time, MONTH).
* Sequential step: create NT different AppendFile objects.
* Sequential write step: for 1 to NT, write 1 row (in a unique month) and
append the DataFile to the corresponding AppendFile. Basically, we create one
parquet file per month (i.e. per partition) containing a single row. This is
done to keep data size small for the experiment. Also, we ensure that each
commit will contain a different partition.
* Parallel commit step: Create a ThreadPool of NT threads, submit a Runnable
which calls AppendFile.commit(), and get the Future. I.e. Run the commits in
parallel.
* Wait for all Futures to finish.
I ran this experiment with various values for params. For example, I varied
minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in (8, 16, 32,
64, 128). Code snippets can be found below.
Observations:
A. Total elapsed commit time increases with the number of committers which is
expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total elapsed
commit time is more than 250 s. This is acceptable given the nature of OCC in
high concurrency.
B. The number of table metadata files is a multiple of the number of
committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table
metadata json files was 380. This is acceptable given the nature of OCC in high
concurrency.
C. The memory usage which keeps shooting-up periodically to 7G in some
experiments. This is noticeable (i.e. memory usage > 1G) when number of
concurrent committers >= 16 and becomes worse when number of committers
increase. I've not investigated further but it could be that the in-memory
metadata (snapshots, etc.) is growing very large. If I serialize the commit
attempts (e.g. by acquiring a lock), the high memory usage problem goes away.
But, I wanted to check here before trying out any alternative.
Why is the concurrent commit important to us?
We have several users who use various processing engines to schedule their
writes (into non-overlapping partitions) through a data service that takes care
of writing and committing the data. In many cases, they end up in the high
commit contention scenario as described above. My main worry here is that this
is happening for a single table, if we have multiple tables being committed,
the memory usage will be much larger.
Questions:
1. Have others observed this behavior? Is the high memory usage expected
or am I doing something wrong? Is there any way to reduce the memory footprint
(e.g. by changing some metadata config) during the commit?
2. What is the general recommendation for high concurrent committers? Is
high concurrent committers an anti-pattern for Iceberg?
Thanks,
Mayur
Code snippets:
Schema schema = new Schema(
NestedField.of(1, false, "time", TimestampType.withoutZone()),
NestedField.of(2, false, "id", IntegerType.get()),
NestedField.of(3, false, "value", DoubleType.get())
);
catalog.createTable(
tableIdentifier,
schema,
PartitionSpec.builderFor(schema).month("time").build(),
ImmutableMap.of(
TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
"write.metadata.previous-versions-max", String.valueOf(1)
) // properties
);
// Write data phase.
List<AppendFiles> appendFilesList = new ArrayList<>();
for (int m = 0; m < NT; m++) {
appendFilesList.add(table.newAppend());
}
for (int m = 0; m < NT; m++) {
LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m,
ChronoUnit.MONTHS);
ImmutableList<GenericRecord> records =
ImmutableList.of(createRecord(schema, time, 1, 10.0));
writeRecords(table, records).forEach(appendFilesList.get(m)::appendFile);
}
// Commit phase.
// High memory usage starts from the commit phase.
ExecutorService executors = Executors.newFixedThreadPool(NT);
List<Future<?>> futures = new ArrayList<>();
for (int m = 0; m < NT; m++) {
final int i = m;
futures.add(executors.submit(() -> {
appendFilesList.get(i).commit();
}));
}
for (int m = 0; m < N; m++) {
futures.get(m).get();
}
executors.shutdownNow();
// snippet of writeRecords().
private static List<DataFile> writeRecords(Table table, List<GenericRecord>
records)
throws IOException {
// PartitionedWriterImpl extends extends PartitionedWriter<Record>
try (var writer = new PartitionedWriterImpl(table)) {
for (var record : records) {
writer.write(record);
}
return Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
}
}
Following is the heap usage for one of the experiments where we can see very
high heap usage. The initial low usage part is data writes. The high heap usage
starts with the commit phase.
[cid:[email protected]]