homatthew commented on code in PR #3574:
URL: https://github.com/apache/gobblin/pull/3574#discussion_r985012096


##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = 
"orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = 
"orc.row.batch.expiry.period.secs";
+    static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1;
+
+    private static RowBatchPool INSTANCE;
+
+    private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches;
+    private final ScheduledExecutorService rowBatchExpiryThread;
+    private final long rowBatchExpiryInterval;
+
+    private RowBatchPool(State properties) {
+        rowBatches = Maps.newHashMap();
+        rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setDaemon(true).build());
+        // expire row batches older N secs
+        rowBatchExpiryInterval = 
properties.getPropAsLong(ROW_BATCH_EXPIRY_INTERVAL, 10);
+        // check every N secs
+        long rowBatchExpiryPeriod = 
properties.getPropAsLong(ROW_BATCH_EXPIRY_PERIOD, 1);

Review Comment:
   You forgot to replace these with the default values declared above



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -153,14 +157,18 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> 
builder, State properties)
 
     // Create value-writer which is essentially a record-by-record-converter 
with buffering in batch.
     this.inputSchema = builder.getSchema();
-    TypeDescription typeDescription = getOrcSchema();
+    this.typeDescription = getOrcSchema();
     this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, 
properties);
     this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, 
DEFAULT_ORC_WRITER_BATCH_SIZE);
-    this.rowBatch = typeDescription.createRowBatch(this.batchSize);
+    this.rowBatchPool = RowBatchPool.instance(properties);

Review Comment:
   Is GobblinBaseOrcWriter created / GC'ed multiple often during lifetime of 
the pipeline? Or is this just created once at the start of the pipeline. Not 
sure if it's overkill, but if the recycling is frequent enough it doesn't hurt 
to use https://www.baeldung.com/java-singleton-double-checked-locking instead.



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = 
"orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = 
"orc.row.batch.expiry.period.secs";
+    static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1;
+
+    private static RowBatchPool INSTANCE;
+
+    private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches;
+    private final ScheduledExecutorService rowBatchExpiryThread;
+    private final long rowBatchExpiryInterval;
+
+    private RowBatchPool(State properties) {
+        rowBatches = Maps.newHashMap();
+        rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setDaemon(true).build());
+        // expire row batches older N secs
+        rowBatchExpiryInterval = 
properties.getPropAsLong(ROW_BATCH_EXPIRY_INTERVAL, 10);
+        // check every N secs
+        long rowBatchExpiryPeriod = 
properties.getPropAsLong(ROW_BATCH_EXPIRY_PERIOD, 1);
+        rowBatchExpiryThread.scheduleAtFixedRate(
+                rowBatchExpiryFn(), rowBatchExpiryPeriod, 
rowBatchExpiryPeriod, TimeUnit.SECONDS);
+    }
+
+    private Runnable rowBatchExpiryFn() {
+        return () -> {
+            synchronized (rowBatches) {
+                for (Map.Entry<TypeDescription, LinkedList<RowBatchHolder>> e 
: rowBatches.entrySet()) {
+                    LinkedList<RowBatchHolder> val = e.getValue();
+                    val.removeIf(this::candidateForRemoval);
+                }
+            }
+        };
+    }
+
+    private boolean candidateForRemoval(RowBatchHolder batch) {
+        long expiryInterval = 
TimeUnit.SECONDS.toMillis(rowBatchExpiryInterval);
+        long interval = System.currentTimeMillis() - batch.lastUsed;
+        if (interval > expiryInterval) {
+            log.info("Expiring row batch {} as it has not been accessed since 
{} ms",
+                    System.identityHashCode(batch.rowBatch), interval);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private static class RowBatchHolder {
+        long lastUsed;
+        VectorizedRowBatch rowBatch;
+
+        private RowBatchHolder(VectorizedRowBatch rowBatch, long 
currentTimeMillis) {
+            this.rowBatch = rowBatch;
+            this.lastUsed = currentTimeMillis;
+        }
+    }
+
+    public synchronized static RowBatchPool instance(State properties) {
+        if (INSTANCE == null) {
+            INSTANCE = new RowBatchPool(properties);
+        }
+        return INSTANCE;
+    }
+
+    public VectorizedRowBatch getRowBatch(TypeDescription schema, int 
batchSize) {
+        synchronized (rowBatches) {
+            LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
+            VectorizedRowBatch rowBatch;
+            if (vals == null || vals.size() == 0) {
+                rowBatch = schema.createRowBatch(batchSize);
+                log.info("Creating new row batch {}", 
System.identityHashCode(rowBatch));
+            } else {
+                rowBatch = vals.removeLast().rowBatch;
+                log.info("Using existing row batch {}", 
System.identityHashCode(rowBatch));
+            }
+            return rowBatch;
+        }
+    }
+
+    public void recycle(TypeDescription schema, VectorizedRowBatch rowBatch) {
+        log.info("Recycling row batch {}", System.identityHashCode(rowBatch));
+        synchronized (rowBatches) {
+            LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
+            if (vals == null) {

Review Comment:
   Preference between computeIfAbsent and null check?



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = 
"orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = 
"orc.row.batch.expiry.period.secs";
+    static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1;
+

Review Comment:
   Really minor nits about style. Typically we follow a pattern of having a 
shared prefix variable for `orc.row.batch.expiry.` or even just `orc.row.batch` 
as the prefix. And then we start the default values with the name `DEFAULT_`
   
   See 
https://github.com/apache/gobblin/blob/3733d6028c437e18eff349ba56d8264a56d4673f/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java#L109-L110
 for an example. 
   
   Also, maybe add the word pool to these settings since these are specific to 
the batch pool and not for regular row batch?



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -153,14 +157,18 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> 
builder, State properties)
 
     // Create value-writer which is essentially a record-by-record-converter 
with buffering in batch.
     this.inputSchema = builder.getSchema();
-    TypeDescription typeDescription = getOrcSchema();
+    this.typeDescription = getOrcSchema();
     this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, 
properties);
     this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, 
DEFAULT_ORC_WRITER_BATCH_SIZE);
-    this.rowBatch = typeDescription.createRowBatch(this.batchSize);
+    this.rowBatchPool = RowBatchPool.instance(properties);
+    this.enableRowBatchPool = 
properties.getPropAsBoolean(RowBatchPool.ENABLE_ROW_BATCH_POOL, true);

Review Comment:
   The default should probably be a separate variable. Did we plan a gradual 
roll out? If so, default of false may be preferred until we are very certain 
about the performance of this feature across all topics.



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = 
"orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = 
"orc.row.batch.expiry.period.secs";
+    static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1;
+
+    private static RowBatchPool INSTANCE;
+
+    private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches;
+    private final ScheduledExecutorService rowBatchExpiryThread;
+    private final long rowBatchExpiryInterval;
+
+    private RowBatchPool(State properties) {
+        rowBatches = Maps.newHashMap();
+        rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setDaemon(true).build());
+        // expire row batches older N secs

Review Comment:
   I wonder if it's clearer to have these comments as doc strings instead where 
they are declared. Kind of impartial since it's all private variables anyways 
but I think if this class were to grow in size / complexity that is a better 
spot



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = 
"orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = 
"orc.row.batch.expiry.period.secs";

Review Comment:
   Also, naming wise it's a little confusing. interval and period are very 
similar words / synonyms so when someone reads the config they will likely get 
confused unless they read the code. Here are some ideas
   
   ```
   orc.row.batch.pool.checkExpiryIntervalSeconds
   orc.row.batch.pool.expiryTTLSeconds
   ```



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -235,6 +243,9 @@ private synchronized void closeInternal()
       this.flush();
       this.orcFileWriter.close();
       this.closed = true;
+      if (enableRowBatchPool) {

Review Comment:
   How often does this occur?



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -269,6 +280,7 @@ public void commit()
   @Override
   public void write(D record)
       throws IOException {
+    Preconditions.checkState(!closed, "Writer already closed");

Review Comment:
   When did you see this edge case happen? And does this cause the fork to 
immediately terminate?



##########
gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/RowBatchPoolTest.java:
##########
@@ -0,0 +1,36 @@
+package org.apache.gobblin.writer;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class RowBatchPoolTest {
+    @Test
+    public void testExpiry() throws Exception {
+        State state = WorkUnit.createEmpty();
+        RowBatchPool instance = RowBatchPool.instance(state);
+        TypeDescription schema = 
TypeDescription.fromString("struct<a:int,b:string>");
+        VectorizedRowBatch rowBatch1 = instance.getRowBatch(schema, 1024);
+        instance.recycle(schema, rowBatch1);
+        VectorizedRowBatch rowBatch2 = instance.getRowBatch(schema, 1024);
+        // existing rowbatch is fetched from pool
+        Assert.assertEquals(rowBatch1, rowBatch2);
+
+        // since the pool has no existing rowbatch, a new one is created
+        VectorizedRowBatch rowBatch3 = instance.getRowBatch(schema, 1024);
+        Assert.assertNotEquals(rowBatch1, rowBatch3);
+
+        // recyle fetched rowbatches
+        instance.recycle(schema, rowBatch2);
+        instance.recycle(schema, rowBatch3);
+
+        // wait for their expiry
+        Thread.sleep(RowBatchPool.ROW_BATCH_EXPIRY_INTERVAL_DEFAULT * 1000L);

Review Comment:
   In general, thread sleep in UT's are bad longterm for our CI. We should 
really be relying on an a Clock-like object to demonstrate elapsed time. Then 
we can mock the passing of time without wasting 10 seconds doing nothing.



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = 
"orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = 
"orc.row.batch.expiry.period.secs";
+    static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1;
+
+    private static RowBatchPool INSTANCE;
+
+    private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches;
+    private final ScheduledExecutorService rowBatchExpiryThread;
+    private final long rowBatchExpiryInterval;
+
+    private RowBatchPool(State properties) {
+        rowBatches = Maps.newHashMap();
+        rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setDaemon(true).build());
+        // expire row batches older N secs
+        rowBatchExpiryInterval = 
properties.getPropAsLong(ROW_BATCH_EXPIRY_INTERVAL, 10);
+        // check every N secs
+        long rowBatchExpiryPeriod = 
properties.getPropAsLong(ROW_BATCH_EXPIRY_PERIOD, 1);
+        rowBatchExpiryThread.scheduleAtFixedRate(
+                rowBatchExpiryFn(), rowBatchExpiryPeriod, 
rowBatchExpiryPeriod, TimeUnit.SECONDS);
+    }
+
+    private Runnable rowBatchExpiryFn() {
+        return () -> {
+            synchronized (rowBatches) {
+                for (Map.Entry<TypeDescription, LinkedList<RowBatchHolder>> e 
: rowBatches.entrySet()) {
+                    LinkedList<RowBatchHolder> val = e.getValue();
+                    val.removeIf(this::candidateForRemoval);
+                }
+            }
+        };
+    }
+
+    private boolean candidateForRemoval(RowBatchHolder batch) {
+        long expiryInterval = 
TimeUnit.SECONDS.toMillis(rowBatchExpiryInterval);
+        long interval = System.currentTimeMillis() - batch.lastUsed;
+        if (interval > expiryInterval) {
+            log.info("Expiring row batch {} as it has not been accessed since 
{} ms",
+                    System.identityHashCode(batch.rowBatch), interval);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private static class RowBatchHolder {
+        long lastUsed;
+        VectorizedRowBatch rowBatch;
+
+        private RowBatchHolder(VectorizedRowBatch rowBatch, long 
currentTimeMillis) {
+            this.rowBatch = rowBatch;
+            this.lastUsed = currentTimeMillis;
+        }
+    }
+
+    public synchronized static RowBatchPool instance(State properties) {
+        if (INSTANCE == null) {
+            INSTANCE = new RowBatchPool(properties);
+        }
+        return INSTANCE;
+    }
+
+    public VectorizedRowBatch getRowBatch(TypeDescription schema, int 
batchSize) {
+        synchronized (rowBatches) {
+            LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
+            VectorizedRowBatch rowBatch;
+            if (vals == null || vals.size() == 0) {

Review Comment:
   Dumb question but do we have a preference between apache commons 
`CollectionUtils.isEmpty` and a basic null check like this?



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -153,14 +157,18 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> 
builder, State properties)
 
     // Create value-writer which is essentially a record-by-record-converter 
with buffering in batch.
     this.inputSchema = builder.getSchema();
-    TypeDescription typeDescription = getOrcSchema();
+    this.typeDescription = getOrcSchema();
     this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, 
properties);
     this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, 
DEFAULT_ORC_WRITER_BATCH_SIZE);
-    this.rowBatch = typeDescription.createRowBatch(this.batchSize);
+    this.rowBatchPool = RowBatchPool.instance(properties);

Review Comment:
   I did some digging, and it seems like we use the GobblinOrcWriter in 2 
spots. 
   1. 
[Fork](https://github.com/apache/gobblin/blob/022b49fdd99cebcc21aad9b6b7cde9df0b092537/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java#L249)
   2. 
[AbstractJobLauncher](https://github.com/apache/gobblin/blob/b726a606cea3deb567b1fdeeba9acbcc220e6d30/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java#L516)
   
   In (1) we init a writer per fork and we may have multiple forks in a Task, 
especially in streaming model task runner. From my understanding, a fork is 
long running so we only create a new orc writer during the the initial run of a 
task and retries
   In (2) we init once during the job launch. 
   
   I think I am missing something here though because we wouldn't need to 
recycle the row batches via a pool if they were create once and forget. 🤔 
   



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java:
##########
@@ -153,14 +157,18 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> 
builder, State properties)
 
     // Create value-writer which is essentially a record-by-record-converter 
with buffering in batch.
     this.inputSchema = builder.getSchema();
-    TypeDescription typeDescription = getOrcSchema();
+    this.typeDescription = getOrcSchema();
     this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, 
properties);
     this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, 
DEFAULT_ORC_WRITER_BATCH_SIZE);
-    this.rowBatch = typeDescription.createRowBatch(this.batchSize);
+    this.rowBatchPool = RowBatchPool.instance(properties);
+    this.enableRowBatchPool = 
properties.getPropAsBoolean(RowBatchPool.ENABLE_ROW_BATCH_POOL, true);
+    this.rowBatch = enableRowBatchPool ? 
rowBatchPool.getRowBatch(typeDescription, batchSize) : 
typeDescription.createRowBatch(batchSize);
     this.deepCleanBatch = 
properties.getPropAsBoolean(ORC_WRITER_DEEP_CLEAN_EVERY_BATCH, false);
 
     log.info("Created ORC writer, batch size: {}, {}: {}",
-            batchSize, OrcConf.ROWS_BETWEEN_CHECKS.name(), 
properties.getProp(OrcConf.ROWS_BETWEEN_CHECKS.name(),
+            batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),

Review Comment:
   Is the reason for changing this because attribute is more general and name 
is specific to hive? I noticed that for this  specific enum value they are the 
same string. 



##########
gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/RowBatchPoolTest.java:
##########
@@ -0,0 +1,36 @@
+package org.apache.gobblin.writer;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class RowBatchPoolTest {
+    @Test
+    public void testExpiry() throws Exception {
+        State state = WorkUnit.createEmpty();
+        RowBatchPool instance = RowBatchPool.instance(state);
+        TypeDescription schema = 
TypeDescription.fromString("struct<a:int,b:string>");
+        VectorizedRowBatch rowBatch1 = instance.getRowBatch(schema, 1024);
+        instance.recycle(schema, rowBatch1);
+        VectorizedRowBatch rowBatch2 = instance.getRowBatch(schema, 1024);
+        // existing rowbatch is fetched from pool
+        Assert.assertEquals(rowBatch1, rowBatch2);
+
+        // since the pool has no existing rowbatch, a new one is created
+        VectorizedRowBatch rowBatch3 = instance.getRowBatch(schema, 1024);
+        Assert.assertNotEquals(rowBatch1, rowBatch3);
+
+        // recyle fetched rowbatches
+        instance.recycle(schema, rowBatch2);
+        instance.recycle(schema, rowBatch3);
+
+        // wait for their expiry
+        Thread.sleep(RowBatchPool.ROW_BATCH_EXPIRY_INTERVAL_DEFAULT * 1000L);

Review Comment:
   Slightly different because I used stopwatch instead of clock, but the here's 
the idea in case you're interested. Replace stopwatch with clock and you should 
get a similar result
   
   
https://github.com/homatthew/gobblin/blob/816d5f969fb8142a5f8d927288778840f82889ba/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetricsTest.java



##########
gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/RowBatchPool.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.writer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class RowBatchPool {
+    static final String ENABLE_ROW_BATCH_POOL = "enable.row.batch.pool";
+
+    static final String ROW_BATCH_EXPIRY_INTERVAL = 
"orc.row.batch.expiry.interval.secs";
+    static final int ROW_BATCH_EXPIRY_INTERVAL_DEFAULT = 10;
+
+    static final String ROW_BATCH_EXPIRY_PERIOD = 
"orc.row.batch.expiry.period.secs";
+    static final int ROW_BATCH_EXPIRY_PERIOD_DEFAULT = 1;
+
+    private static RowBatchPool INSTANCE;
+
+    private final Map<TypeDescription, LinkedList<RowBatchHolder>> rowBatches;
+    private final ScheduledExecutorService rowBatchExpiryThread;
+    private final long rowBatchExpiryInterval;
+
+    private RowBatchPool(State properties) {
+        rowBatches = Maps.newHashMap();
+        rowBatchExpiryThread = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setDaemon(true).build());
+        // expire row batches older N secs
+        rowBatchExpiryInterval = 
properties.getPropAsLong(ROW_BATCH_EXPIRY_INTERVAL, 10);
+        // check every N secs
+        long rowBatchExpiryPeriod = 
properties.getPropAsLong(ROW_BATCH_EXPIRY_PERIOD, 1);
+        rowBatchExpiryThread.scheduleAtFixedRate(
+                rowBatchExpiryFn(), rowBatchExpiryPeriod, 
rowBatchExpiryPeriod, TimeUnit.SECONDS);
+    }
+
+    private Runnable rowBatchExpiryFn() {
+        return () -> {
+            synchronized (rowBatches) {
+                for (Map.Entry<TypeDescription, LinkedList<RowBatchHolder>> e 
: rowBatches.entrySet()) {
+                    LinkedList<RowBatchHolder> val = e.getValue();
+                    val.removeIf(this::candidateForRemoval);
+                }
+            }
+        };
+    }
+
+    private boolean candidateForRemoval(RowBatchHolder batch) {
+        long expiryInterval = 
TimeUnit.SECONDS.toMillis(rowBatchExpiryInterval);
+        long interval = System.currentTimeMillis() - batch.lastUsed;
+        if (interval > expiryInterval) {
+            log.info("Expiring row batch {} as it has not been accessed since 
{} ms",
+                    System.identityHashCode(batch.rowBatch), interval);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private static class RowBatchHolder {
+        long lastUsed;
+        VectorizedRowBatch rowBatch;
+
+        private RowBatchHolder(VectorizedRowBatch rowBatch, long 
currentTimeMillis) {
+            this.rowBatch = rowBatch;
+            this.lastUsed = currentTimeMillis;
+        }
+    }
+
+    public synchronized static RowBatchPool instance(State properties) {
+        if (INSTANCE == null) {
+            INSTANCE = new RowBatchPool(properties);
+        }
+        return INSTANCE;
+    }
+
+    public VectorizedRowBatch getRowBatch(TypeDescription schema, int 
batchSize) {
+        synchronized (rowBatches) {
+            LinkedList<RowBatchHolder> vals = rowBatches.get(schema);
+            VectorizedRowBatch rowBatch;
+            if (vals == null || vals.size() == 0) {
+                rowBatch = schema.createRowBatch(batchSize);
+                log.info("Creating new row batch {}", 
System.identityHashCode(rowBatch));
+            } else {
+                rowBatch = vals.removeLast().rowBatch;

Review Comment:
   What's the reason for using the last one versus the first one? My guess is 
that LIFO ordering minimizes the number of objects in general versus FIFO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to