We are in the process of supporting multiple file system schemes using ResolvingFileIO, Ryan just added the initial implementation: https://github.com/apache/iceberg/pull/3593
-Jack On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava < mayur.srivast...@twosigma.com> wrote: > Thanks Ryan. > > > > I’m looking at the heapdump. At a preliminary look in jvisualvm, I see the > following top two objects: > > 1. ‘byte[]’ : 87% of memory usage, (>100k instances with a total of > 3.2G in one of my tests). I checked some of the reference and find that > they are from > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media. > MediaHttpUploader > <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references > coming from WriterBasedJsonGenerator, finalizer > (HadoopPositionOutputStream), etc as well. I’m not familiar with this code, > but is it possible that Hadoop output streams are not closed and close is > called the finalizers? > > 2. ‘int[]’ : 12% usage (7k instances), but I can’t expand the > references. > > > > One interesting finding is that if I switch to the S3FileIO, the high > memory usage goes away and the memory usage is similar to the serialized > commits using a lock which is ~750 M for 128 parallel committers. And the > 750 M usage may fall-in line with the snapshots and manifest* objects. > > > > So, the high memory problem manifests only when using the default > HadoopFileSystem. > > > > Thanks, Mayur > > > > PS: I had to change S3FileIO locally to accept gs:// prefix so that it > works with GCS. Is there a plan to support gs:// prefix in the S3URI? > > > > *From:* Ryan Blue <b...@tabular.io> > *Sent:* Tuesday, November 30, 2021 3:53 PM > *To:* Iceberg Dev List <dev@iceberg.apache.org> > *Subject:* Re: High memory usage with highly concurrent committers > > > > Mayur, > > > > Is it possible to connect to this process with a profiler and look at > what's taking up all of the space? > > > > I suspect that what's happening here is that you're loading the list of > snapshots for each version of metadata, so you're holding a lot of copies > of the entire snapshot history and possibly caching the list of manifests > for some snapshots as well. > > > > I've thought about adding a way to avoid parsing and loading snapshots, > probably by passing a cache when loading metadata so that all the copies of > a table can share snapshots in memory. That would work fine because they're > immutable. That might help you here, although a Snapshot instance will > cache manifests after loading them if they are accessed, so you'd want to > watch out for that as well. > > > > The best step forward is to get an idea of what objects are taking up that > space with a profiler or heap dump if you can. > > > > Ryan > > > > On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava < > mayur.srivast...@twosigma.com> wrote: > > Hi Iceberg Community, > > > > I’m running some experiments with high commit contention (on the same > Iceberg table writing to different partitions) and I'm observing very high > memory usage (5G to 7G). (Note that the data being written is very small.) > > > > *The scenario is described below:* > > > > *Note1: The catalog used is similar to the JDBC catalog.* > > *Note2: The data is stored on S3 and HadoopFileSystem is used to talk to > S3.* > > *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main > branch.* > > > > *Experiment params:* > > a. NT = 64 = number of parallel committers. Achieved using multiple > threads within the same process. > > b. minWait = COMMIT_MIN_RETRY_WAIT_MS > > c. maxWait = COMMIT_MAX_RETRY_WAIT_MS > > d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure > enough retries are done so that all committers finish successfully). > > > > *Steps:* > > > > * *Create an Iceberg table *with three columns: time (timestamp without > timezone), id (int32), value (float64). The partition spec is (time, MONTH). > > * Sequential step: create *NT different AppendFile* objects. > > * Sequential write step: for 1 to NT, *write 1 row* (in a unique month) > and append the DataFile to the corresponding AppendFile. Basically, we > create one parquet file per month (i.e. per partition) containing a single > row. This is done to keep data size small for the experiment. Also, we > ensure that each commit will contain a different partition. > > * *Parallel commit step*: Create a ThreadPool of NT threads, submit a > Runnable which calls *AppendFile.commit()*, and get the Future. I.e. Run > the commits in parallel. > > * Wait for all Futures to finish. > > I ran this experiment with various values for params. For example, I > varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in > (8, 16, 32, 64, 128). Code snippets can be found below. > > > > *Observations:* > > A. Total elapsed commit time increases with the number of committers which > is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total > elapsed commit time is more than 250 s. This is acceptable given the nature > of OCC in high concurrency. > > B. The number of table metadata files is a multiple of the number of > committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table > metadata json files was 380. This is acceptable given the nature of OCC in > high concurrency. > > *C. The memory usage which keeps shooting-up periodically to 7G in some > experiments. This is noticeable (i.e. memory usage > 1G) when number of > concurrent committers >= 16 and becomes worse when number of committers > increase. I’ve not investigated further but it could be that the in-memory > metadata (snapshots, etc.) is growing very large. If I serialize the commit > attempts (e.g. by acquiring a lock), the high memory usage problem goes > away. But, I wanted to check here before trying out any alternative.* > > > > *Why is the concurrent commit important to us?* > > We have several users who use various processing engines to schedule their > writes (into non-overlapping partitions) through a data service that takes > care of writing and committing the data. In many cases, they end up in the > high commit contention scenario as described above. My main worry here is > that this is happening for a single table, if we have multiple tables being > committed, the memory usage will be much larger. > > > > *Questions: * > > 1. Have others observed this behavior? Is the high memory usage > expected or am I doing something wrong? Is there any way to reduce the > memory footprint (e.g. by changing some metadata config) during the commit? > > 2. What is the general recommendation for high concurrent > committers? Is high concurrent committers an anti-pattern for Iceberg? > > > > Thanks, > > Mayur > > *Code snippets:* > > > > Schema schema = new Schema( > > NestedField.of(1, false, "time", TimestampType.withoutZone()), > > NestedField.of(2, false, "id", IntegerType.get()), > > NestedField.of(3, false, "value", DoubleType.get()) > > ); > > > > catalog.createTable( > > tableIdentifier, > > schema, > > PartitionSpec.builderFor(schema).month("time").build(), > > ImmutableMap.of( > > TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256), > > TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait), > > TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait), > > "write.metadata.previous-versions-max", String.valueOf(1) > > ) // properties > > ); > > > > // Write data phase. > > List<AppendFiles> appendFilesList = new ArrayList<>(); > > for (int m = 0; m < NT; m++) { > > appendFilesList.add(table.newAppend()); > > } > > > > for (int m = 0; m < NT; m++) { > > LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m, > ChronoUnit.MONTHS); > > ImmutableList<GenericRecord> records = > ImmutableList.of(createRecord(schema, time, 1, 10.0)); > > writeRecords(table, > records).forEach(appendFilesList.get(m)::appendFile); > > } > > > > // Commit phase. > > // High memory usage starts from the commit phase. > > ExecutorService executors = Executors.newFixedThreadPool(NT); > > List<Future<?>> futures = new ArrayList<>(); > > for (int m = 0; m < NT; m++) { > > final int i = m; > > futures.add(executors.submit(() -> { > > appendFilesList.get(i).commit(); > > })); > > } > > > > for (int m = 0; m < N; m++) { > > futures.get(m).get(); > > } > > > > executors.shutdownNow(); > > > > // snippet of writeRecords(). > > private static List<DataFile> writeRecords(Table table, > List<GenericRecord> records) > > throws IOException { > > // PartitionedWriterImpl extends extends PartitionedWriter<Record> > > try (var writer = new PartitionedWriterImpl(table)) { > > for (var record : records) { > > writer.write(record); > > } > > return > Arrays.stream(writer.dataFiles()).collect(Collectors.toList()); > > } > > } > > > > Following is the heap usage for one of the experiments where we can see > very high heap usage. The initial low usage part is data writes. The high > heap usage starts with the commit phase. > > > > > -- > > Ryan Blue > > Tabular >