We are in the process of supporting multiple file system schemes using
ResolvingFileIO, Ryan just added the initial implementation:
https://github.com/apache/iceberg/pull/3593

-Jack

On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava <
mayur.srivast...@twosigma.com> wrote:

> Thanks Ryan.
>
>
>
> I’m looking at the heapdump. At a preliminary look in jvisualvm, I see the
> following top two objects:
>
> 1.      ‘byte[]’ : 87% of memory usage, (>100k instances with a total of
> 3.2G in one of my tests). I checked some of the reference and find that
> they are from
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.
>  MediaHttpUploader
> <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references
> coming from WriterBasedJsonGenerator, finalizer
> (HadoopPositionOutputStream), etc as well. I’m not familiar with this code,
> but is it possible that Hadoop output streams are not closed and close is
> called the finalizers?
>
> 2.      ‘int[]’ : 12% usage (7k instances), but I can’t expand the
> references.
>
>
>
> One interesting finding is that if I switch to the S3FileIO, the high
> memory usage goes away and the memory usage is similar to the serialized
> commits using a lock which is ~750 M for 128 parallel committers. And the
> 750 M usage may fall-in line with the snapshots and manifest* objects.
>
>
>
> So, the high memory problem manifests only when using the default
> HadoopFileSystem.
>
>
>
> Thanks, Mayur
>
>
>
> PS: I had to change S3FileIO locally to accept gs:// prefix so that it
> works with GCS. Is there a plan to support gs:// prefix in the S3URI?
>
>
>
> *From:* Ryan Blue <b...@tabular.io>
> *Sent:* Tuesday, November 30, 2021 3:53 PM
> *To:* Iceberg Dev List <dev@iceberg.apache.org>
> *Subject:* Re: High memory usage with highly concurrent committers
>
>
>
> Mayur,
>
>
>
> Is it possible to connect to this process with a profiler and look at
> what's taking up all of the space?
>
>
>
> I suspect that what's happening here is that you're loading the list of
> snapshots for each version of metadata, so you're holding a lot of copies
> of the entire snapshot history and possibly caching the list of manifests
> for some snapshots as well.
>
>
>
> I've thought about adding a way to avoid parsing and loading snapshots,
> probably by passing a cache when loading metadata so that all the copies of
> a table can share snapshots in memory. That would work fine because they're
> immutable. That might help you here, although a Snapshot instance will
> cache manifests after loading them if they are accessed, so you'd want to
> watch out for that as well.
>
>
>
> The best step forward is to get an idea of what objects are taking up that
> space with a profiler or heap dump if you can.
>
>
>
> Ryan
>
>
>
> On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <
> mayur.srivast...@twosigma.com> wrote:
>
> Hi Iceberg Community,
>
>
>
> I’m running some experiments with high commit contention (on the same
> Iceberg table writing to different partitions) and I'm observing very high
> memory usage (5G to 7G). (Note that the data being written is very small.)
>
>
>
> *The scenario is described below:*
>
>
>
> *Note1: The catalog used is similar to the JDBC catalog.*
>
> *Note2: The data is stored on S3 and HadoopFileSystem is used to talk to
> S3.*
>
> *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main
> branch.*
>
>
>
> *Experiment params:*
>
> a. NT = 64 = number of parallel committers. Achieved using multiple
> threads within the same process.
>
> b. minWait = COMMIT_MIN_RETRY_WAIT_MS
>
> c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
>
> d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure
> enough retries are done so that all committers finish successfully).
>
>
>
> *Steps:*
>
>
>
> * *Create an Iceberg table *with three columns: time (timestamp without
> timezone), id (int32), value (float64). The partition spec is (time, MONTH).
>
> * Sequential step: create *NT different AppendFile* objects.
>
> * Sequential write step: for 1 to NT, *write 1 row* (in a unique month)
> and append the DataFile to the corresponding AppendFile. Basically, we
> create one parquet file per month (i.e. per partition) containing a single
> row. This is done to keep data size small for the experiment. Also, we
> ensure that each commit will contain a different partition.
>
> * *Parallel commit step*: Create a ThreadPool of NT threads, submit a
> Runnable which calls *AppendFile.commit()*, and get the Future. I.e. Run
> the commits in parallel.
>
> * Wait for all Futures to finish.
>
> I ran this experiment with various values for params. For example, I
> varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in
> (8, 16, 32, 64, 128). Code snippets can be found below.
>
>
>
> *Observations:*
>
> A. Total elapsed commit time increases with the number of committers which
> is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total
> elapsed commit time is more than 250 s. This is acceptable given the nature
> of OCC in high concurrency.
>
> B. The number of table metadata files is a multiple of the number of
> committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table
> metadata json files was 380. This is acceptable given the nature of OCC in
> high concurrency.
>
> *C. The memory usage which keeps shooting-up  periodically to 7G in some
> experiments. This is noticeable (i.e. memory usage > 1G) when number of
> concurrent committers >= 16 and becomes worse when number of committers
> increase. I’ve not investigated further but it could be that the in-memory
> metadata (snapshots, etc.) is growing very large. If I serialize the commit
> attempts (e.g. by acquiring a lock), the high memory usage problem goes
> away. But, I wanted to check here before trying out any alternative.*
>
>
>
> *Why is the concurrent commit important to us?*
>
> We have several users who use various processing engines to schedule their
> writes (into non-overlapping partitions) through a data service that takes
> care of writing and committing the data. In many cases, they end up in the
> high commit contention scenario as described above. My main worry here is
> that this is happening for a single table, if we have multiple tables being
> committed, the memory usage will be much larger.
>
>
>
> *Questions:  *
>
> 1.      Have others observed this behavior? Is the high memory usage
> expected or am I doing something wrong? Is there any way to reduce the
> memory footprint (e.g. by changing some metadata config) during the commit?
>
> 2.      What is the general recommendation for high concurrent
> committers? Is high concurrent committers an anti-pattern for Iceberg?
>
>
>
> Thanks,
>
> Mayur
>
> *Code snippets:*
>
>
>
> Schema schema = new Schema(
>
>     NestedField.of(1, false, "time", TimestampType.withoutZone()),
>
>     NestedField.of(2, false, "id", IntegerType.get()),
>
>     NestedField.of(3, false, "value", DoubleType.get())
>
> );
>
>
>
> catalog.createTable(
>
>     tableIdentifier,
>
>     schema,
>
>     PartitionSpec.builderFor(schema).month("time").build(),
>
>     ImmutableMap.of(
>
>         TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
>
>         TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
>
>         TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
>
>         "write.metadata.previous-versions-max", String.valueOf(1)
>
>     ) // properties
>
> );
>
>
>
> // Write data phase.
>
> List<AppendFiles> appendFilesList = new ArrayList<>();
>
> for (int m = 0; m < NT; m++) {
>
>     appendFilesList.add(table.newAppend());
>
> }
>
>
>
> for (int m = 0; m < NT; m++) {
>
>     LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m,
> ChronoUnit.MONTHS);
>
>     ImmutableList<GenericRecord> records =
> ImmutableList.of(createRecord(schema, time, 1, 10.0));
>
>     writeRecords(table,
> records).forEach(appendFilesList.get(m)::appendFile);
>
> }
>
>
>
> // Commit phase.
>
> // High memory usage starts from the commit phase.
>
> ExecutorService executors = Executors.newFixedThreadPool(NT);
>
> List<Future<?>> futures = new ArrayList<>();
>
> for (int m = 0; m < NT; m++) {
>
>     final int i = m;
>
>     futures.add(executors.submit(() -> {
>
>         appendFilesList.get(i).commit();
>
>     }));
>
> }
>
>
>
> for (int m = 0; m < N; m++) {
>
>     futures.get(m).get();
>
> }
>
>
>
> executors.shutdownNow();
>
>
>
> // snippet of writeRecords().
>
> private static List<DataFile> writeRecords(Table table,
> List<GenericRecord> records)
>
>         throws IOException {
>
>     // PartitionedWriterImpl extends extends PartitionedWriter<Record>
>
>     try (var writer = new PartitionedWriterImpl(table)) {
>
>         for (var record : records) {
>
>             writer.write(record);
>
>         }
>
>         return
> Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
>
>     }
>
> }
>
>
>
> Following is the heap usage for one of the experiments where we can see
> very high heap usage. The initial low usage part is data writes. The high
> heap usage starts with the commit phase.
>
>
>
>
> --
>
> Ryan Blue
>
> Tabular
>

Reply via email to