Hi Iceberg Community, I'm running some experiments with high commit contention (on the same Iceberg table writing to different partitions) and I'm observing very high memory usage (5G to 7G). (Note that the data being written is very small.)
The scenario is described below: Note1: The catalog used is similar to the JDBC catalog. Note2: The data is stored on S3 and HadoopFileSystem is used to talk to S3. Note3: Iceberg code is ~6 months old. I haven't tried the latest main branch. Experiment params: a. NT = 64 = number of parallel committers. Achieved using multiple threads within the same process. b. minWait = COMMIT_MIN_RETRY_WAIT_MS c. maxWait = COMMIT_MAX_RETRY_WAIT_MS d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure enough retries are done so that all committers finish successfully). Steps: * Create an Iceberg table with three columns: time (timestamp without timezone), id (int32), value (float64). The partition spec is (time, MONTH). * Sequential step: create NT different AppendFile objects. * Sequential write step: for 1 to NT, write 1 row (in a unique month) and append the DataFile to the corresponding AppendFile. Basically, we create one parquet file per month (i.e. per partition) containing a single row. This is done to keep data size small for the experiment. Also, we ensure that each commit will contain a different partition. * Parallel commit step: Create a ThreadPool of NT threads, submit a Runnable which calls AppendFile.commit(), and get the Future. I.e. Run the commits in parallel. * Wait for all Futures to finish. I ran this experiment with various values for params. For example, I varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in (8, 16, 32, 64, 128). Code snippets can be found below. Observations: A. Total elapsed commit time increases with the number of committers which is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total elapsed commit time is more than 250 s. This is acceptable given the nature of OCC in high concurrency. B. The number of table metadata files is a multiple of the number of committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table metadata json files was 380. This is acceptable given the nature of OCC in high concurrency. C. The memory usage which keeps shooting-up periodically to 7G in some experiments. This is noticeable (i.e. memory usage > 1G) when number of concurrent committers >= 16 and becomes worse when number of committers increase. I've not investigated further but it could be that the in-memory metadata (snapshots, etc.) is growing very large. If I serialize the commit attempts (e.g. by acquiring a lock), the high memory usage problem goes away. But, I wanted to check here before trying out any alternative. Why is the concurrent commit important to us? We have several users who use various processing engines to schedule their writes (into non-overlapping partitions) through a data service that takes care of writing and committing the data. In many cases, they end up in the high commit contention scenario as described above. My main worry here is that this is happening for a single table, if we have multiple tables being committed, the memory usage will be much larger. Questions: 1. Have others observed this behavior? Is the high memory usage expected or am I doing something wrong? Is there any way to reduce the memory footprint (e.g. by changing some metadata config) during the commit? 2. What is the general recommendation for high concurrent committers? Is high concurrent committers an anti-pattern for Iceberg? Thanks, Mayur Code snippets: Schema schema = new Schema( NestedField.of(1, false, "time", TimestampType.withoutZone()), NestedField.of(2, false, "id", IntegerType.get()), NestedField.of(3, false, "value", DoubleType.get()) ); catalog.createTable( tableIdentifier, schema, PartitionSpec.builderFor(schema).month("time").build(), ImmutableMap.of( TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256), TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait), TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait), "write.metadata.previous-versions-max", String.valueOf(1) ) // properties ); // Write data phase. List<AppendFiles> appendFilesList = new ArrayList<>(); for (int m = 0; m < NT; m++) { appendFilesList.add(table.newAppend()); } for (int m = 0; m < NT; m++) { LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m, ChronoUnit.MONTHS); ImmutableList<GenericRecord> records = ImmutableList.of(createRecord(schema, time, 1, 10.0)); writeRecords(table, records).forEach(appendFilesList.get(m)::appendFile); } // Commit phase. // High memory usage starts from the commit phase. ExecutorService executors = Executors.newFixedThreadPool(NT); List<Future<?>> futures = new ArrayList<>(); for (int m = 0; m < NT; m++) { final int i = m; futures.add(executors.submit(() -> { appendFilesList.get(i).commit(); })); } for (int m = 0; m < N; m++) { futures.get(m).get(); } executors.shutdownNow(); // snippet of writeRecords(). private static List<DataFile> writeRecords(Table table, List<GenericRecord> records) throws IOException { // PartitionedWriterImpl extends extends PartitionedWriter<Record> try (var writer = new PartitionedWriterImpl(table)) { for (var record : records) { writer.write(record); } return Arrays.stream(writer.dataFiles()).collect(Collectors.toList()); } } Following is the heap usage for one of the experiments where we can see very high heap usage. The initial low usage part is data writes. The high heap usage starts with the commit phase. [cid:image001.jpg@01D7E5FF.AA6361E0]