[ 
https://issues.apache.org/jira/browse/GOBBLIN-1715?focusedWorklogId=814797&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814797
 ]

ASF GitHub Bot logged work on GOBBLIN-1715:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Oct/22 19:48
            Start Date: 07/Oct/22 19:48
    Worklog Time Spent: 10m 
      Work Description: rdsr commented on code in PR #3574:
URL: https://github.com/apache/gobblin/pull/3574#discussion_r990454116


##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = 
"orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = 
"orc.row.batch.expiry.period.secs";
+    static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1;
+
+    private static RowBatchPool INSTANCE;
+
+    private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches;
+    private final ScheduledExecutorService rowBatchExpiryThread;
+    private final long rowBatchExpiryInterval;
+
+    private RowBatchPool(State properties) {
+        rowBatches = Maps.newHashMap();
+        rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setDaemon(true).build());
+        // expire row batches older N secs
+        rowBatchExpiryInterval = 
properties.getPropAsLong(ROW_BATCH_EXPIRY_INTERVAL, 10);
+        // check every N secs
+        long rowBatchExpiryPeriod = 
properties.getPropAsLong(ROW_BATCH_EXPIRY_PERIOD, 1);
+        rowBatchExpiryThread.scheduleAtFixedRate(
+                rowBatchExpiryFn(), rowBatchExpiryPeriod, 
rowBatchExpiryPeriod, TimeUnit.SECONDS);
+    }
+
+    private Runnable rowBatchExpiryFn() {
+        return () -> {
+            synchronized (rowBatches) {
+                for (Map.Entry<TypeDescription, LinkedList<RowBatchHolder>> e 
: rowBatches.entrySet()) {
+                    LinkedList<RowBatchHolder> val = e.getValue();
+                    val.removeIf(this::candidateForRemoval);
+                }
+            }
+        };
+    }
+
+    private boolean candidateForRemoval(RowBatchHolder batch) {
+        long expiryInterval = 
TimeUnit.SECONDS.toMillis(rowBatchExpiryInterval);
+        long interval = System.currentTimeMillis() - batch.lastUsed;
+        if (interval > expiryInterval) {
+            log.info("Expiring row batch {} as it has not been accessed since 
{} ms",
+                    System.identityHashCode(batch.rowBatch), interval);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private static class RowBatchHolder {
+        long lastUsed;
+        VectorizedRowBatch rowBatch;
+
+        private RowBatchHolder(VectorizedRowBatch rowBatch, long 
currentTimeMillis) {
+            this.rowBatch = rowBatch;
+            this.lastUsed = currentTimeMillis;
+        }
+    }
+
+    public synchronized static RowBatchPool instance(State properties) {
+        if (INSTANCE == null) {
+            INSTANCE = new RowBatchPool(properties);
+        }
+        return INSTANCE;
+    }
+
+    public VectorizedRowBatch getRowBatch(TypeDescription schema, int 
batchSize) {
+        synchronized (rowBatches) {
+            LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
+            VectorizedRowBatch rowBatch;
+            if (vals == null || vals.size() == 0) {

Review Comment:
   I'm not keen on adding more dependencies here if this is a small change 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 814797)
    Time Spent: 2h  (was: 1h 50m)

> Support vectorized row batch pooling
> ------------------------------------
>
>                 Key: GOBBLIN-1715
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1715
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-core
>            Reporter: Ratandeep Ratti
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> The pre-allocation method allocates vastly more memory for ORC ColumnVectors 
> of arrays and maps than needed and is unpredictable as it depends upon the 
> size of the current column vector’s length, which can change as we allocate 
> more memory to it. From the heap dump done on a kafka topic we saw that on 
> the second resize call for an array ColumnVector, where request size was ~ 1k 
> elements, it had requested to allocate around 444M elements. This resulted in 
> over allocating way past the heap size. This was the primary reason why  we 
> see OOM failures during ingestion for deeply nested records
> Update: Below is an example of how a very large memory can be allocated using 
> smart resizing procedure. The formula for allocating memory is 
> {noformat}
> child_vector resize = 
>    child_vector_request_size  + 
>   (child_vector_request_size / rowsAdded + 1) * current_vector_size
> {noformat}
> If we now have deeply nested arrays of arrays each of 525 elements in a row 
> like The memory will be allocated as such.
> {noformat}
> 1st resize = (525 + 525/1 + 1) * 256 = 135181 ; current vector size by 
> default is batch size = 256
> 2nd resize = (525 + 525/1 + 1) * 135181 = *71105731*                         
> ; current vector size = 135181 
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to