[ https://issues.apache.org/jira/browse/GOBBLIN-1715?focusedWorklogId=814798&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814798 ]
ASF GitHub Bot logged work on GOBBLIN-1715: ------------------------------------------- Author: ASF GitHub Bot Created on: 07/Oct/22 19:49 Start Date: 07/Oct/22 19:49 Worklog Time Spent: 10m Work Description: rdsr commented on code in PR #3574: URL: https://github.com/apache/gobblin/pull/3574#discussion_r990454724 ########## gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.writer; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.State; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class RowBatchPool { + static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool"; + + static final String ROW_BATCH_EXPIRY_INTERVAL = "orc.row.batch.expiry.interval.secs"; + static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10; + + static final String ROW_BATCH_EXPIRY_PERIOD = "orc.row.batch.expiry.period.secs"; + static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1; + + private static RowBatchPool INSTANCE; + + private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches; + private final ScheduledExecutorService rowBatchExpiryThread; + private final long rowBatchExpiryInterval; + + private RowBatchPool(State properties) { + rowBatches = Maps.newHashMap(); + rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).build()); + // expire row batches older N secs + rowBatchExpiryInterval = properties.getPropAsLong(ROW_BATCH_EXPIRY_INTERVAL, 10); + // check every N secs + long rowBatchExpiryPeriod = properties.getPropAsLong(ROW_BATCH_EXPIRY_PERIOD, 1); + rowBatchExpiryThread.scheduleAtFixedRate( + rowBatchExpiryFn(), rowBatchExpiryPeriod, rowBatchExpiryPeriod, TimeUnit.SECONDS); + } + + private Runnable rowBatchExpiryFn() { + return () -> { + synchronized (rowBatches) { + for (Map.Entry<TypeDescription, LinkedList<RowBatchHolder>> e : rowBatches.entrySet()) { + LinkedList<RowBatchHolder> val = e.getValue(); + val.removeIf(this::candidateForRemoval); + } + } + }; + } + + private boolean candidateForRemoval(RowBatchHolder batch) { + long expiryInterval = TimeUnit.SECONDS.toMillis(rowBatchExpiryInterval); + long interval = System.currentTimeMillis() - batch.lastUsed; + if (interval > expiryInterval) { + log.info("Expiring row batch {} as it has not been accessed since {} ms", + System.identityHashCode(batch.rowBatch), interval); + return true; + } else { + return false; + } + } + + private static class RowBatchHolder { + long lastUsed; + VectorizedRowBatch rowBatch; + + private RowBatchHolder(VectorizedRowBatch rowBatch, long currentTimeMillis) { + this.rowBatch = rowBatch; + this.lastUsed = currentTimeMillis; + } + } + + public synchronized static RowBatchPool instance(State properties) { + if (INSTANCE == null) { + INSTANCE = new RowBatchPool(properties); + } + return INSTANCE; + } + + public VectorizedRowBatch getRowBatch(TypeDescription schema, int batchSize) { + synchronized (rowBatches) { + LinkedList<RowBatchHolder> vals = rowBatches.get(schema); + VectorizedRowBatch rowBatch; + if (vals == null || vals.size() == 0) { + rowBatch = schema.createRowBatch(batchSize); + log.info("Creating new row batch {}", System.identityHashCode(rowBatch)); + } else { + rowBatch = vals.removeLast().rowBatch; + log.info("Using existing row batch {}", System.identityHashCode(rowBatch)); + } + return rowBatch; + } + } + + public void recycle(TypeDescription schema, VectorizedRowBatch rowBatch) { + log.info("Recycling row batch {}", System.identityHashCode(rowBatch)); + synchronized (rowBatches) { + LinkedList<RowBatchHolder> vals = rowBatches.get(schema); Review Comment: Fixed it to use computeifabsent Issue Time Tracking ------------------- Worklog Id: (was: 814798) Time Spent: 2h 10m (was: 2h) > Support vectorized row batch pooling > ------------------------------------ > > Key: GOBBLIN-1715 > URL: https://issues.apache.org/jira/browse/GOBBLIN-1715 > Project: Apache Gobblin > Issue Type: Bug > Components: gobblin-core > Reporter: Ratandeep Ratti > Assignee: Abhishek Tiwari > Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > The pre-allocation method allocates vastly more memory for ORC ColumnVectors > of arrays and maps than needed and is unpredictable as it depends upon the > size of the current column vector’s length, which can change as we allocate > more memory to it. From the heap dump done on a kafka topic we saw that on > the second resize call for an array ColumnVector, where request size was ~ 1k > elements, it had requested to allocate around 444M elements. This resulted in > over allocating way past the heap size. This was the primary reason why we > see OOM failures during ingestion for deeply nested records > Update: Below is an example of how a very large memory can be allocated using > smart resizing procedure. The formula for allocating memory is > {noformat} > child_vector resize = > child_vector_request_size + > (child_vector_request_size / rowsAdded + 1) * current_vector_size > {noformat} > If we now have deeply nested arrays of arrays each of 525 elements in a row > like The memory will be allocated as such. > {noformat} > 1st resize = (525 + 525/1 + 1) * 256 = 135181 ; current vector size by > default is batch size = 256 > 2nd resize = (525 + 525/1 + 1) * 135181 = *71105731* > ; current vector size = 135181 > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)