jihoonson commented on a change in pull request #11294:
URL: https://github.com/apache/druid/pull/11294#discussion_r659087724



##########
File path: 
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
##########
@@ -764,7 +691,7 @@ private DataSegment mergeAndPush(
     SinkMetadata sm = sinksMetadata.get(identifier);
     if (sm == null) {
       log.warn("Sink metadata not found just before merge for identifier 
[%s]", identifier);
-    } else if (numHydrants != sinksMetadata.get(identifier).getNumHydrants()) {
+    } else if (numHydrants != sm.getNumHydrants()) {
       throw new ISE("Number of restored hydrants[%d] for identifier[%s] does 
not match expected value[%d]",
                     numHydrants, identifier, 
sinksMetadata.get(identifier).getNumHydrants());

Review comment:
       This should use `sm` too because `sinksMetadata.get(identifier)` can 
return null if `drop()` is called for some reason after you get `sm above.
   
   ```suggestion
                       numHydrants, identifier, sm.getNumHydrants());
   ```

##########
File path: 
indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorDriverTest.java
##########
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.segment.realtime.appenderator;
+package org.apache.druid.indexing.appenderator;

Review comment:
       The way the coverage bot works currently is running all tests and 
finding the lines and branches in the corresponding classes to those tests. One 
requirement is that the target class to test and its test class must be in the 
same package. So, I would suggest not moving this class if possible because you 
will need to move lots of other classes along with it.

##########
File path: 
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
##########
@@ -0,0 +1,1143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.druid.data.input.Committer;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
+import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.segment.realtime.FireHydrant;
+import org.apache.druid.segment.realtime.plumber.Sink;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+public class BatchAppenderator implements Appenderator
+{
+  public static final int ROUGH_OVERHEAD_PER_SINK = 5000;
+  // Rough estimate of memory footprint of empty FireHydrant based on actual 
heap dumps
+  public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000;
+
+  private static final EmittingLogger log = new 
EmittingLogger(BatchAppenderator.class);
+  private static final int WARN_DELAY = 1000;
+  private static final String IDENTIFIER_FILE_NAME = "identifier.json";
+
+  private final String myId;
+  private final DataSchema schema;
+  private final AppenderatorConfig tuningConfig;
+  private final FireDepartmentMetrics metrics;
+  private final DataSegmentPusher dataSegmentPusher;
+  private final ObjectMapper objectMapper;
+  private final IndexIO indexIO;
+  private final IndexMerger indexMerger;
+  /**
+   * This map needs to be concurrent because it's accessed and mutated from 
multiple threads from where
+   * this Appenderator is used (and methods like {@link 
#add(SegmentIdWithShardSpec, InputRow, Supplier, boolean)} are
+   * called). It could also be accessed (but not mutated) potentially in the 
context
+   * of any thread from {@link #drop}.
+   */
+  private final ConcurrentMap<SegmentIdWithShardSpec, Sink> sinks = new 
ConcurrentHashMap<>();
+  private final long maxBytesTuningConfig;
+  private final boolean skipBytesInMemoryOverheadCheck;
+
+  /**
+   * The following sinks metadata map and associated class are the way to 
retain metadata now that sinks
+   * are being completely removed from memory after each incremental persist.
+   */
+  private final ConcurrentHashMap<SegmentIdWithShardSpec, SinkMetadata> 
sinksMetadata = new ConcurrentHashMap<>();
+
+  /**
+   * This class is used for information that needs to be kept related to Sinks 
as
+   * they are persisted and removed from memory at every incremental persist.
+   * The information is used for sanity checks and as information required
+   * for functionality, depending in the field that is used. More info about 
the
+   * fields is annotated as comments in the class
+   */
+  private static class SinkMetadata
+  {
+    /** This is used to maintain the rows in the sink accross persists of the 
sink
+    * used for functionality (i.e. to detect whether an incremental push
+    * is needed {@link AppenderatorDriverAddResult#isPushRequired(Integer, 
Long)}
+    **/
+    private int numRowsInSegment;
+    /** For sanity check as well as functionality: to make sure that all 
hydrants for a sink are restored from disk at
+     * push time and also to remember the fire hydrant "count" when persisting 
it.
+     */
+    private int numHydrants;
+
+    public SinkMetadata()
+    {
+      this(0, 0);
+    }
+
+    public SinkMetadata(int numRowsInSegment, int numHydrants)
+    {
+      this.numRowsInSegment = numRowsInSegment;
+      this.numHydrants = numHydrants;
+    }
+
+    public void addRows(int num)
+    {
+      numRowsInSegment += num;
+    }
+
+    public void addHydrants(int num)
+    {
+      numHydrants += num;
+    }
+
+    public int getNumRowsInSegment()
+    {
+      return numRowsInSegment;
+    }
+
+    public int getNumHydrants()
+    {
+      return numHydrants;
+    }
+
+  }
+
+  // This variable updated in add(), persist(), and drop()
+  private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
+  private final AtomicInteger totalRows = new AtomicInteger();
+  private final AtomicLong bytesCurrentlyInMemory = new AtomicLong();
+  private final RowIngestionMeters rowIngestionMeters;
+  private final ParseExceptionHandler parseExceptionHandler;
+
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private volatile FileLock basePersistDirLock = null;
+  private volatile FileChannel basePersistDirLockChannel = null;
+
+  /**
+   * This constructor allows the caller to provide its own 
SinkQuerySegmentWalker.
+   * <p>
+   * The sinkTimeline is set to the sink timeline of the provided 
SinkQuerySegmentWalker.
+   * If the SinkQuerySegmentWalker is null, a new sink timeline is initialized.
+   * <p>
+   * It is used by UnifiedIndexerAppenderatorsManager which allows queries on 
data associated with multiple
+   * Appenderators.
+   */
+  BatchAppenderator(
+      String id,
+      DataSchema schema,
+      AppenderatorConfig tuningConfig,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      @Nullable SinkQuerySegmentWalker sinkQuerySegmentWalker,

Review comment:
       This parameter must be always null per the argument checker below.. Can 
we just remove it?

##########
File path: 
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
##########
@@ -604,48 +557,39 @@ private SegmentsAndCommitMetadata persistAllAndClear()
     final Stopwatch runExecStopwatch = Stopwatch.createStarted();
     final Stopwatch persistStopwatch = Stopwatch.createStarted();
     AtomicLong totalPersistedRows = new AtomicLong(numPersistedRows);
-    final ListenableFuture<Object> future = persistExecutor.submit(
-        new Callable<Object>()
-        {
-          @Override
-          public Object call()
-          {
-            try {
-              for (Pair<FireHydrant, SegmentIdWithShardSpec> pair : 
indexesToPersist) {
-                metrics.incrementRowOutputCount(persistHydrant(pair.lhs, 
pair.rhs));
-              }
-
-              log.info(
-                  "Persisted in-memory data for segments: %s",
-                  indexesToPersist.stream()
-                                  .map(itp -> itp.rhs.asSegmentId().toString())
-                                  .distinct()
-                                  .collect(Collectors.joining(", "))
-              );
-              log.info(
-                  "Persisted stats: processed rows: [%d], persisted rows[%d], 
sinks: [%d], total fireHydrants (across sinks): [%d], persisted fireHydrants 
(across sinks): [%d]",
-                  rowIngestionMeters.getProcessed(),
-                  totalPersistedRows.get(),
-                  totalSinks,
-                  totalHydrantsCount.longValue(),
-                  totalHydrantsPersistedAcrossSinks.longValue()
-              );
-
-              // return null if committer is null
-              return null;
-            }
-            catch (Exception e) {
-              metrics.incrementFailedPersists();
-              throw e;
-            }
-            finally {
-              metrics.incrementNumPersists();
-              
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
-              persistStopwatch.stop();
-            }
-          }
-        }
-    );
+
+    try {
+      for (Pair<FireHydrant, SegmentIdWithShardSpec> pair : indexesToPersist) {
+        metrics.incrementRowOutputCount(persistHydrant(pair.lhs, pair.rhs));
+      }
+
+      log.info(
+          "Persisted in-memory data for segments: %s",
+          indexesToPersist.stream()
+                          .filter(itp -> itp.rhs != null)
+                          .map(itp -> itp.rhs.asSegmentId().toString())
+                          .distinct()
+                          .collect(Collectors.joining(", "))
+      );
+      log.info(
+          "Persisted stats: processed rows: [%d], persisted rows[%d], sinks: 
[%d], total fireHydrants (across sinks): [%d], persisted fireHydrants (across 
sinks): [%d]",
+          rowIngestionMeters.getProcessed(),
+          totalPersistedRows.get(),
+          totalSinks,
+          totalHydrantsCount.longValue(),
+          totalHydrantsPersistedAcrossSinks.longValue()
+      );
+
+    }
+    catch (Exception e) {
+      metrics.incrementFailedPersists();
+      throw e;
+    }
+    finally {
+      metrics.incrementNumPersists();
+      
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
+      persistStopwatch.stop();
+    }
 
     final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
     metrics.incrementPersistBackPressureMillis(startDelay);

Review comment:
       This will report a wrong metric because there is no start delay now. I 
think we don't have to report it since we don't use the executor anymore. You 
can remove `runExecStopwatch` too.

##########
File path: 
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
##########
@@ -0,0 +1,1143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.druid.data.input.Committer;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
+import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.segment.realtime.FireHydrant;
+import org.apache.druid.segment.realtime.plumber.Sink;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+public class BatchAppenderator implements Appenderator
+{
+  public static final int ROUGH_OVERHEAD_PER_SINK = 5000;
+  // Rough estimate of memory footprint of empty FireHydrant based on actual 
heap dumps
+  public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000;
+
+  private static final EmittingLogger log = new 
EmittingLogger(BatchAppenderator.class);
+  private static final int WARN_DELAY = 1000;
+  private static final String IDENTIFIER_FILE_NAME = "identifier.json";
+
+  private final String myId;
+  private final DataSchema schema;
+  private final AppenderatorConfig tuningConfig;
+  private final FireDepartmentMetrics metrics;
+  private final DataSegmentPusher dataSegmentPusher;
+  private final ObjectMapper objectMapper;
+  private final IndexIO indexIO;
+  private final IndexMerger indexMerger;
+  /**
+   * This map needs to be concurrent because it's accessed and mutated from 
multiple threads from where
+   * this Appenderator is used (and methods like {@link 
#add(SegmentIdWithShardSpec, InputRow, Supplier, boolean)} are
+   * called). It could also be accessed (but not mutated) potentially in the 
context
+   * of any thread from {@link #drop}.
+   */
+  private final ConcurrentMap<SegmentIdWithShardSpec, Sink> sinks = new 
ConcurrentHashMap<>();
+  private final long maxBytesTuningConfig;
+  private final boolean skipBytesInMemoryOverheadCheck;
+
+  /**
+   * The following sinks metadata map and associated class are the way to 
retain metadata now that sinks
+   * are being completely removed from memory after each incremental persist.
+   */
+  private final ConcurrentHashMap<SegmentIdWithShardSpec, SinkMetadata> 
sinksMetadata = new ConcurrentHashMap<>();
+
+  /**
+   * This class is used for information that needs to be kept related to Sinks 
as
+   * they are persisted and removed from memory at every incremental persist.
+   * The information is used for sanity checks and as information required
+   * for functionality, depending in the field that is used. More info about 
the
+   * fields is annotated as comments in the class
+   */
+  private static class SinkMetadata
+  {
+    /** This is used to maintain the rows in the sink accross persists of the 
sink
+    * used for functionality (i.e. to detect whether an incremental push
+    * is needed {@link AppenderatorDriverAddResult#isPushRequired(Integer, 
Long)}
+    **/
+    private int numRowsInSegment;
+    /** For sanity check as well as functionality: to make sure that all 
hydrants for a sink are restored from disk at
+     * push time and also to remember the fire hydrant "count" when persisting 
it.
+     */
+    private int numHydrants;
+
+    public SinkMetadata()
+    {
+      this(0, 0);
+    }
+
+    public SinkMetadata(int numRowsInSegment, int numHydrants)
+    {
+      this.numRowsInSegment = numRowsInSegment;
+      this.numHydrants = numHydrants;
+    }
+
+    public void addRows(int num)
+    {
+      numRowsInSegment += num;
+    }
+
+    public void addHydrants(int num)
+    {
+      numHydrants += num;
+    }
+
+    public int getNumRowsInSegment()
+    {
+      return numRowsInSegment;
+    }
+
+    public int getNumHydrants()
+    {
+      return numHydrants;
+    }
+
+  }
+
+  // This variable updated in add(), persist(), and drop()
+  private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
+  private final AtomicInteger totalRows = new AtomicInteger();
+  private final AtomicLong bytesCurrentlyInMemory = new AtomicLong();
+  private final RowIngestionMeters rowIngestionMeters;
+  private final ParseExceptionHandler parseExceptionHandler;
+
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private volatile FileLock basePersistDirLock = null;
+  private volatile FileChannel basePersistDirLockChannel = null;
+
+  /**
+   * This constructor allows the caller to provide its own 
SinkQuerySegmentWalker.
+   * <p>
+   * The sinkTimeline is set to the sink timeline of the provided 
SinkQuerySegmentWalker.
+   * If the SinkQuerySegmentWalker is null, a new sink timeline is initialized.
+   * <p>
+   * It is used by UnifiedIndexerAppenderatorsManager which allows queries on 
data associated with multiple
+   * Appenderators.
+   */
+  BatchAppenderator(
+      String id,
+      DataSchema schema,
+      AppenderatorConfig tuningConfig,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      @Nullable SinkQuerySegmentWalker sinkQuerySegmentWalker,
+      IndexIO indexIO,
+      IndexMerger indexMerger,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
+  )
+  {
+    Preconditions.checkArgument(
+        sinkQuerySegmentWalker == null,
+        "Batch appenderator does not use a versioned timeline"
+    );
+
+    this.myId = id;
+    this.schema = Preconditions.checkNotNull(schema, "schema");
+    this.tuningConfig = Preconditions.checkNotNull(tuningConfig, 
"tuningConfig");
+    this.metrics = Preconditions.checkNotNull(metrics, "metrics");
+    this.dataSegmentPusher = Preconditions.checkNotNull(dataSegmentPusher, 
"dataSegmentPusher");
+    this.objectMapper = Preconditions.checkNotNull(objectMapper, 
"objectMapper");
+    this.indexIO = Preconditions.checkNotNull(indexIO, "indexIO");
+    this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger");
+    this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, 
"rowIngestionMeters");
+    this.parseExceptionHandler = 
Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
+
+    maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
+    skipBytesInMemoryOverheadCheck = 
tuningConfig.isSkipBytesInMemoryOverheadCheck();
+  }
+
+  @Override
+  public String getId()
+  {
+    return myId;
+  }
+
+  @Override
+  public String getDataSource()
+  {
+    return schema.getDataSource();
+  }
+
+  @Override
+  public Object startJob()
+  {
+    tuningConfig.getBasePersistDirectory().mkdirs();
+    lockBasePersistDirectory();
+    return null;
+  }
+
+  @Override
+  public AppenderatorAddResult add(
+      final SegmentIdWithShardSpec identifier,
+      final InputRow row,
+      @Nullable final Supplier<Committer> committerSupplier,
+      final boolean allowIncrementalPersists
+  ) throws IndexSizeExceededException, SegmentNotWritableException
+  {
+
+    Preconditions.checkArgument(
+        committerSupplier == null,
+        "Batch appenderator does not need a committer!"
+    );
+
+    Preconditions.checkArgument(
+        allowIncrementalPersists,
+        "Batch appenderator should always allow incremental persists!"
+    );
+
+    if (!identifier.getDataSource().equals(schema.getDataSource())) {
+      throw new IAE(
+          "Expected dataSource[%s] but was asked to insert row for 
dataSource[%s]?!",
+          schema.getDataSource(),
+          identifier.getDataSource()
+      );
+    }
+
+    final Sink sink = getOrCreateSink(identifier);
+    metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch());
+    final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
+    final int sinkRowsInMemoryAfterAdd;
+    final long bytesInMemoryBeforeAdd = sink.getBytesInMemory();
+    final long bytesInMemoryAfterAdd;
+    final IncrementalIndexAddResult addResult;
+
+    try {
+      addResult = sink.add(row, false); // allow incrememtal persis is always 
true for batch
+      sinkRowsInMemoryAfterAdd = addResult.getRowCount();
+      bytesInMemoryAfterAdd = addResult.getBytesInMemory();
+    }
+    catch (IndexSizeExceededException e) {
+      // Uh oh, we can't do anything about this! We can't persist (commit 
metadata would be out of sync) and we
+      // can't add the row (it just failed). This should never actually 
happen, though, because we check
+      // sink.canAddRow after returning from add.
+      log.error(e, "Sink for segment[%s] was unexpectedly full!", identifier);
+      throw e;
+    }
+
+    if (sinkRowsInMemoryAfterAdd < 0) {
+      throw new SegmentNotWritableException("Attempt to add row to swapped-out 
sink for segment[%s].", identifier);
+    }
+
+    if (addResult.isRowAdded()) {
+      rowIngestionMeters.incrementProcessed();
+    } else if (addResult.hasParseException()) {
+      parseExceptionHandler.handle(addResult.getParseException());
+    }
+
+    final int numAddedRows = sinkRowsInMemoryAfterAdd - 
sinkRowsInMemoryBeforeAdd;
+    rowsCurrentlyInMemory.addAndGet(numAddedRows);
+    bytesCurrentlyInMemory.addAndGet(bytesInMemoryAfterAdd - 
bytesInMemoryBeforeAdd);
+    totalRows.addAndGet(numAddedRows);
+    sinksMetadata.computeIfAbsent(identifier, unused -> new 
SinkMetadata()).addRows(numAddedRows);
+
+    boolean persist = false;
+    List<String> persistReasons = new ArrayList<>();
+
+    if (!sink.canAppendRow()) {
+      persist = true;
+      persistReasons.add("No more rows can be appended to sink");
+    }
+    if (rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) {
+      persist = true;
+      persistReasons.add(StringUtils.format(
+          "rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d]",
+          rowsCurrentlyInMemory.get(),
+          tuningConfig.getMaxRowsInMemory()
+      ));
+    }
+    if (bytesCurrentlyInMemory.get() >= maxBytesTuningConfig) {
+      persist = true;
+      persistReasons.add(StringUtils.format(
+          "bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]",
+          bytesCurrentlyInMemory.get(),
+          maxBytesTuningConfig
+      ));
+    }
+    if (persist) {
+      // persistAll clears rowsCurrentlyInMemory, no need to update it.
+      log.info("Incremental persist to disk because %s.", String.join(",", 
persistReasons));
+
+      long bytesToBePersisted = 0L;
+      for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
+        final Sink sinkEntry = entry.getValue();
+        if (sinkEntry != null) {
+          bytesToBePersisted += sinkEntry.getBytesInMemory();
+          if (sinkEntry.swappable()) {
+            // Code for batch no longer memory maps hydrants but they still 
take memory...
+            int memoryStillInUse = calculateMemoryUsedByHydrant();
+            bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
+          }
+        }
+      }
+
+      if (!skipBytesInMemoryOverheadCheck
+          && bytesCurrentlyInMemory.get() - bytesToBePersisted > 
maxBytesTuningConfig) {
+        // We are still over maxBytesTuningConfig even after persisting.
+        // This means that we ran out of all available memory to ingest (due 
to overheads created as part of ingestion)
+        final String alertMessage = StringUtils.format(
+            "Task has exceeded safe estimated heap usage limits, failing "
+            + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: 
[%d])"
+            + "(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > 
maxBytesTuningConfig: [%d])",
+            sinks.size(),
+            sinks.values().stream().mapToInt(Iterables::size).sum(),
+            getTotalRowCount(),
+            bytesCurrentlyInMemory.get(),
+            bytesToBePersisted,
+            maxBytesTuningConfig
+        );
+        final String errorMessage = StringUtils.format(
+            "%s.\nThis can occur when the overhead from too many intermediary 
segment persists becomes to "
+            + "great to have enough space to process additional input rows. 
This check, along with metering the overhead "
+            + "of these objects to factor into the 'maxBytesInMemory' 
computation, can be disabled by setting "
+            + "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so 
might allow the task to naturally encounter "
+            + "a 'java.lang.OutOfMemoryError'). Alternatively, 
'maxBytesInMemory' can be increased which will cause an "
+            + "increase in heap footprint, but will allow for more 
intermediary segment persists to occur before "
+            + "reaching this condition.",
+            alertMessage
+        );
+        log.makeAlert(alertMessage)
+           .addData("dataSource", schema.getDataSource())
+           .emit();
+        throw new RuntimeException(errorMessage);
+      }
+
+      persistAllAndClear();
+
+    }
+    return new AppenderatorAddResult(identifier, 
sinksMetadata.get(identifier).numRowsInSegment, false);
+  }
+
+  @Override
+  public List<SegmentIdWithShardSpec> getSegments()
+  {
+    return ImmutableList.copyOf(sinks.keySet());
+  }
+
+  @Override
+  public int getRowCount(final SegmentIdWithShardSpec identifier)
+  {
+    return sinksMetadata.get(identifier).getNumRowsInSegment();
+  }
+
+  @Override
+  public int getTotalRowCount()
+  {
+    return totalRows.get();
+  }
+
+  @VisibleForTesting
+  public int getRowsInMemory()
+  {
+    return rowsCurrentlyInMemory.get();
+  }
+
+  @VisibleForTesting
+  public long getBytesCurrentlyInMemory()
+  {
+    return bytesCurrentlyInMemory.get();
+  }
+
+  @VisibleForTesting
+  public long getBytesInMemory(SegmentIdWithShardSpec identifier)
+  {
+    final Sink sink = sinks.get(identifier);
+
+    if (sink == null) {
+      return 0L; // sinks are removed after a persist
+    } else {
+      return sink.getBytesInMemory();
+    }
+  }
+
+  private Sink getOrCreateSink(final SegmentIdWithShardSpec identifier)
+  {
+    Sink retVal = sinks.get(identifier);
+
+    if (retVal == null) {
+      retVal = new Sink(
+          identifier.getInterval(),
+          schema,
+          identifier.getShardSpec(),
+          identifier.getVersion(),
+          tuningConfig.getAppendableIndexSpec(),
+          tuningConfig.getMaxRowsInMemory(),
+          maxBytesTuningConfig,
+          null
+      );
+      bytesCurrentlyInMemory.addAndGet(calculateSinkMemoryInUsed());
+
+      sinks.put(identifier, retVal);
+      metrics.setSinkCount(sinks.size());
+    }
+
+    return retVal;
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, 
final Iterable<Interval> intervals)
+  {
+    throw new UnsupportedOperationException("No query runner for batch 
appenderator");
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, 
final Iterable<SegmentDescriptor> specs)
+  {
+    throw new UnsupportedOperationException("No query runner for batch 
appenderator");
+  }
+
+  @Override
+  public void clear() throws InterruptedException
+  {
+    clear(true);
+  }
+
+  private void clear(boolean removeOnDiskData)
+  {
+    // Drop commit metadata, then abandon all segments.
+    log.info("Clearing all sinks & hydrants, removing data on disk: [%s]", 
removeOnDiskData);
+    // Drop everything.
+    for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
+      removeSink(entry.getKey(), entry.getValue(), removeOnDiskData);
+    }
+  }
+
+  @Override
+  public ListenableFuture<?> drop(final SegmentIdWithShardSpec identifier)
+  {
+    final Sink sink = sinks.get(identifier);
+    SinkMetadata sm = sinksMetadata.remove(identifier);
+    if (sm != null) {
+      int originalTotalRows = getTotalRowCount();
+      int rowsToDrop = sm.getNumRowsInSegment();
+      int totalRowsAfter = originalTotalRows - rowsToDrop;
+      if (totalRowsAfter < 0) {
+        log.warn("Total rows[%d] after dropping segment[%s] rows [%d]", 
totalRowsAfter, identifier, rowsToDrop);
+      }
+      totalRows.set(Math.max(totalRowsAfter, 0));
+    }
+    if (sink != null) {
+      removeSink(identifier, sink, true);
+    }
+    return Futures.immediateFuture(null);
+  }
+
+  private void persistAllAndClear()
+  {
+    // make sure sinks are cleared before push is called
+    try {
+      persistAll(null).get();
+      clear(false);
+    }
+    catch (Throwable t) {
+      throw new RE(t, "Error while persisting");
+    }
+  }
+
+  @Override
+  public ListenableFuture<Object> persistAll(@Nullable final Committer 
committer)
+  {
+    final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = 
new ArrayList<>();
+    int numPersistedRows = 0;
+    long bytesPersisted = 0L;
+    MutableInt totalHydrantsCount = new MutableInt();
+    MutableInt totalHydrantsPersistedAcrossSinks = new MutableInt();
+    final long totalSinks = sinks.size();
+    for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
+      final SegmentIdWithShardSpec identifier = entry.getKey();
+      final Sink sink = entry.getValue();
+      if (sink == null) {
+        throw new ISE("No sink for identifier: %s", identifier);
+      }
+
+      final List<FireHydrant> hydrants = Lists.newArrayList(sink);
+      totalHydrantsCount.add(hydrants.size());
+      numPersistedRows += sink.getNumRowsInMemory();
+      bytesPersisted += sink.getBytesInMemory();
+
+      final int limit = sink.isWritable() ? hydrants.size() - 1 : 
hydrants.size();
+
+      // gather hydrants that have not been persisted:
+      for (FireHydrant hydrant : hydrants.subList(0, limit)) {
+        if (!hydrant.hasSwapped()) {
+          log.debug("Hydrant[%s] hasn't persisted yet, persisting. 
Segment[%s]", hydrant, identifier);
+          indexesToPersist.add(Pair.of(hydrant, identifier));
+          totalHydrantsPersistedAcrossSinks.add(1);
+        }
+      }
+
+      if (sink.swappable()) {
+        // It is swappable. Get the old one to persist it and create a new one:
+        indexesToPersist.add(Pair.of(sink.swap(), identifier));
+        totalHydrantsPersistedAcrossSinks.add(1);
+      }
+
+    }
+    log.debug("Submitting persist runnable for dataSource[%s]", 
schema.getDataSource());
+
+    if (indexesToPersist.isEmpty()) {
+      log.info("No indexes will be peristed");
+    }
+    final Stopwatch runExecStopwatch = Stopwatch.createStarted();
+    final Stopwatch persistStopwatch = Stopwatch.createStarted();
+    AtomicLong totalPersistedRows = new AtomicLong(numPersistedRows);
+
+    try {
+      for (Pair<FireHydrant, SegmentIdWithShardSpec> pair : indexesToPersist) {
+        metrics.incrementRowOutputCount(persistHydrant(pair.lhs, pair.rhs));
+      }
+
+      log.info(
+          "Persisted in-memory data for segments: %s",
+          indexesToPersist.stream()
+                          .filter(itp -> itp.rhs != null)
+                          .map(itp -> itp.rhs.asSegmentId().toString())
+                          .distinct()
+                          .collect(Collectors.joining(", "))
+      );
+      log.info(
+          "Persisted stats: processed rows: [%d], persisted rows[%d], sinks: 
[%d], total fireHydrants (across sinks): [%d], persisted fireHydrants (across 
sinks): [%d]",
+          rowIngestionMeters.getProcessed(),
+          totalPersistedRows.get(),
+          totalSinks,
+          totalHydrantsCount.longValue(),
+          totalHydrantsPersistedAcrossSinks.longValue()
+      );
+
+    }
+    catch (Exception e) {
+      metrics.incrementFailedPersists();
+      throw e;
+    }
+    finally {
+      metrics.incrementNumPersists();
+      
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
+      persistStopwatch.stop();
+    }
+
+    final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
+    metrics.incrementPersistBackPressureMillis(startDelay);
+    if (startDelay > WARN_DELAY) {
+      log.warn("Ingestion was throttled for [%,d] millis because persists were 
pending.", startDelay);
+    }
+    runExecStopwatch.stop();
+
+    // NB: The rows are still in memory until they're done persisting, but we 
only count rows in active indexes.
+    rowsCurrentlyInMemory.addAndGet(-numPersistedRows);
+    bytesCurrentlyInMemory.addAndGet(-bytesPersisted);
+
+    log.info("Persisted rows[%,d] and bytes[%,d]", numPersistedRows, 
bytesPersisted);
+
+    return Futures.immediateFuture(null);
+  }
+
+  @Override
+  public ListenableFuture<SegmentsAndCommitMetadata> push(
+      final Collection<SegmentIdWithShardSpec> identifiers,
+      @Nullable final Committer committer,
+      final boolean useUniquePath
+  )
+  {
+
+    if (committer != null) {
+      throw new ISE("There should be no committer for batch ingestion");
+    }
+
+    // Any sinks not persisted so far will be persisted before push:
+    persistAllAndClear();
+
+    log.info("Preparing to push...");
+    final List<DataSegment> dataSegments = new ArrayList<>();
+    List<File> persistedIdentifiers = getPersistedidentifierPaths();
+    if (persistedIdentifiers == null) {
+      throw new ISE("Identifiers were persisted but could not be retrieved");
+    }
+    for (File identifier : persistedIdentifiers) {
+      Pair<SegmentIdWithShardSpec, Sink> identifiersAndSinks;
+      try {
+        identifiersAndSinks = getIdentifierAndSinkForPersistedFile(identifier);
+      }
+      catch (IOException e) {
+        throw new ISE(e, "Failed to retrieve sinks for identifier", 
identifier);
+      }
+      final DataSegment dataSegment = mergeAndPush(
+          identifiersAndSinks.lhs,
+          identifiersAndSinks.rhs,
+          useUniquePath
+      );
+      if (dataSegment != null) {
+        dataSegments.add(dataSegment);
+      } else {
+        log.warn("mergeAndPush[%s] returned null, skipping.", 
identifiersAndSinks.lhs);
+      }
+    }
+    log.info("Push complete...");
+
+    return Futures.immediateFuture(new SegmentsAndCommitMetadata(dataSegments, 
null));
+  }
+
+  /**
+   * Merge segment, push to deep storage. Should only be used on segments that 
have been fully persisted.
+   *
+   * @param identifier    sink identifier
+   * @param sink          sink to push
+   * @param useUniquePath true if the segment should be written to a path with 
a unique identifier
+   * @return segment descriptor, or null if the sink is no longer valid
+   */
+  @Nullable
+  private DataSegment mergeAndPush(
+      final SegmentIdWithShardSpec identifier,
+      final Sink sink,
+      final boolean useUniquePath
+  )
+  {
+
+    // Use a descriptor file to indicate that pushing has completed.
+    final File persistDir = computePersistDir(identifier);
+    final File mergedTarget = new File(persistDir, "merged");
+    final File descriptorFile = computeDescriptorFile(identifier);
+
+    // Sanity checks
+    if (sink.isWritable()) {
+      throw new ISE("Expected sink to be no longer writable before 
mergeAndPush for segment[%s].", identifier);
+    }
+
+    int numHydrants = 0;
+    for (FireHydrant hydrant : sink) {
+      synchronized (hydrant) {
+        if (!hydrant.hasSwapped()) {
+          throw new ISE("Expected sink to be fully persisted before 
mergeAndPush for segment[%s].", identifier);
+        }
+      }
+      numHydrants++;
+    }
+
+    SinkMetadata sm = sinksMetadata.get(identifier);
+    if (sm == null) {
+      log.warn("Sink metadata not found just before merge for identifier 
[%s]", identifier);
+    } else if (numHydrants != sm.getNumHydrants()) {
+      throw new ISE("Number of restored hydrants[%d] for identifier[%s] does 
not match expected value[%d]",
+                    numHydrants, identifier, 
sinksMetadata.get(identifier).getNumHydrants());
+    }
+
+    try {
+      if (descriptorFile.exists()) {
+        // Already pushed.
+
+        if (useUniquePath) {
+          // Don't reuse the descriptor, because the caller asked for a unique 
path. Leave the old one as-is, since
+          // it might serve some unknown purpose.
+          log.debug(
+              "Segment[%s] already pushed, but we want a unique path, so will 
push again with a new path.",
+              identifier
+          );
+        } else {
+          log.info("Segment[%s] already pushed, skipping.", identifier);
+          return objectMapper.readValue(descriptorFile, DataSegment.class);
+        }
+      }
+
+      removeDirectory(mergedTarget);
+
+      if (mergedTarget.exists()) {
+        throw new ISE("Merged target[%s] exists after removing?!", 
mergedTarget);
+      }
+
+      final File mergedFile;
+      final long mergeFinishTime;
+      final long startTime = System.nanoTime();
+      List<QueryableIndex> indexes = new ArrayList<>();
+      Closer closer = Closer.create();
+      try {
+        for (FireHydrant fireHydrant : sink) {
+          Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = 
fireHydrant.getAndIncrementSegment();
+          final QueryableIndex queryableIndex = 
segmentAndCloseable.lhs.asQueryableIndex();
+          log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant);
+          indexes.add(queryableIndex);
+          closer.register(segmentAndCloseable.rhs);
+        }
+
+        mergedFile = indexMerger.mergeQueryableIndex(
+            indexes,
+            schema.getGranularitySpec().isRollup(),
+            schema.getAggregators(),
+            schema.getDimensionsSpec(),
+            mergedTarget,
+            tuningConfig.getIndexSpec(),
+            tuningConfig.getSegmentWriteOutMediumFactory(),
+            tuningConfig.getMaxColumnsToMerge()
+        );
+
+        mergeFinishTime = System.nanoTime();
+
+        log.debug("Segment[%s] built in %,dms.", identifier, (mergeFinishTime 
- startTime) / 1000000);
+      }
+      catch (Throwable t) {
+        throw closer.rethrow(t);
+      }
+      finally {
+        closer.close();
+      }
+
+      // Retry pushing segments because uploading to deep storage might fail 
especially for cloud storage types
+      final DataSegment segment = RetryUtils.retry(
+          // The appenderator is currently being used for the local indexing 
task and the Kafka indexing task. For the
+          // Kafka indexing task, pushers must use unique file paths in deep 
storage in order to maintain exactly-once
+          // semantics.
+          () -> dataSegmentPusher.push(
+              mergedFile,
+              sink.getSegment()
+                  
.withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(
+                      indexes,
+                      schema.getDimensionsSpec()
+                  )),
+              useUniquePath
+          ),
+          exception -> exception instanceof Exception,
+          5
+      );
+
+      // Drop the queryable indexes behind the hydrants... they are not needed 
anymore and their
+      // mapped file references
+      // can generate OOMs during merge if enough of them are held back...
+      // agfixme: Since we cannot keep sinks due to memory growth then we have 
to add the sink metadata table and keep it up to date
+      //sinks.put(identifier,sink);
+      for (FireHydrant fireHydrant : sink) {
+        fireHydrant.swapSegment(null);
+      }
+
+      // cleanup, sink no longer needed
+      removeDirectory(computePersistDir(identifier));
+
+      final long pushFinishTime = System.nanoTime();
+
+      log.info(
+          "Segment[%s] of %,d bytes "
+          + "built from %d incremental persist(s) in %,dms; "
+          + "pushed to deep storage in %,dms. "
+          + "Load spec is: %s",
+          identifier,
+          segment.getSize(),
+          indexes.size(),
+          (mergeFinishTime - startTime) / 1000000,
+          (pushFinishTime - mergeFinishTime) / 1000000,
+          objectMapper.writeValueAsString(segment.getLoadSpec())
+      );
+
+      return segment;
+    }
+    catch (Exception e) {
+      metrics.incrementFailedHandoffs();
+      log.warn(e, "Failed to push merged index for segment[%s].", identifier);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close()
+  {
+    if (!closed.compareAndSet(false, true)) {
+      log.debug("Appenderator already closed, skipping close() call.");
+      return;
+    }
+
+    log.debug("Shutting down...");
+
+    for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
+      removeSink(entry.getKey(), entry.getValue(), false);
+    }
+
+    unlockBasePersistDirectory();
+
+    // cleanup:
+    List<File> persistedIdentifiers = getPersistedidentifierPaths();
+    if (persistedIdentifiers != null) {
+      for (File identifier : persistedIdentifiers) {
+        removeDirectory(identifier);
+      }
+    }
+
+    totalRows.set(0);
+    sinksMetadata.clear();
+  }
+
+  /**
+    Nothing to do since there are no executors
+   */
+  @Override
+  public void closeNow()
+  {
+    if (!closed.compareAndSet(false, true)) {
+      log.debug("Appenderator already closed, skipping closeNow() call.");
+      return;
+    }
+
+    log.debug("Shutting down immediately...");
+  }
+
+  private void lockBasePersistDirectory()
+  {
+    if (basePersistDirLock == null) {
+      try {
+        basePersistDirLockChannel = FileChannel.open(
+            computeLockFile().toPath(),
+            StandardOpenOption.CREATE,
+            StandardOpenOption.WRITE
+        );
+
+        basePersistDirLock = basePersistDirLockChannel.tryLock();
+        if (basePersistDirLock == null) {
+          throw new ISE("Cannot acquire lock on basePersistDir: %s", 
computeLockFile());
+        }
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private void unlockBasePersistDirectory()
+  {
+    try {
+      if (basePersistDirLock != null) {
+        basePersistDirLock.release();
+        basePersistDirLockChannel.close();
+        basePersistDirLock = null;
+      }
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  @Nullable
+  public List<File> getPersistedidentifierPaths()
+  {
+
+    ArrayList<File> retVal = new ArrayList<>();
+
+    final File baseDir = tuningConfig.getBasePersistDirectory();
+    if (!baseDir.exists()) {
+      return null;
+    }
+
+    final File[] files = baseDir.listFiles();
+    if (files == null) {
+      return null;
+    }
+
+    for (File sinkDir : files) {
+      final File identifierFile = new File(sinkDir, IDENTIFIER_FILE_NAME);
+      if (!identifierFile.isFile()) {
+        // No identifier in this sinkDir; it must not actually be a sink 
directory. Skip it.
+        continue;
+      }
+      retVal.add(sinkDir);
+    }
+
+    return retVal;
+  }
+
+  private Pair<SegmentIdWithShardSpec, Sink> 
getIdentifierAndSinkForPersistedFile(File identifierPath)
+      throws IOException
+  {
+
+    final SegmentIdWithShardSpec identifier = objectMapper.readValue(
+        new File(identifierPath, IDENTIFIER_FILE_NAME),
+        SegmentIdWithShardSpec.class
+    );
+
+    // To avoid reading and listing of "merged" dir and other special files
+    final File[] sinkFiles = identifierPath.listFiles(
+        (dir, fileName) -> !(Ints.tryParse(fileName) == null)
+    );
+    if (sinkFiles == null) {
+      throw new ISE("Problem reading persisted sinks in path", identifierPath);
+    }
+
+    Arrays.sort(
+        sinkFiles,
+        (o1, o2) -> Ints.compare(Integer.parseInt(o1.getName()), 
Integer.parseInt(o2.getName()))
+    );
+
+    List<FireHydrant> hydrants = new ArrayList<>();
+    for (File hydrantDir : sinkFiles) {
+      final int hydrantNumber = Integer.parseInt(hydrantDir.getName());
+
+      log.debug("Loading previously persisted partial segment at [%s]", 
hydrantDir);
+      if (hydrantNumber != hydrants.size()) {
+        throw new ISE("Missing hydrant [%,d] in identifier [%s].", 
hydrants.size(), identifier);
+      }
+
+      hydrants.add(
+          new FireHydrant(
+              new QueryableIndexSegment(indexIO.loadIndex(hydrantDir), 
identifier.asSegmentId()),
+              hydrantNumber
+          )
+      );
+    }
+
+    Sink currSink = new Sink(
+        identifier.getInterval(),
+        schema,
+        identifier.getShardSpec(),
+        identifier.getVersion(),
+        tuningConfig.getAppendableIndexSpec(),
+        tuningConfig.getMaxRowsInMemory(),
+        maxBytesTuningConfig,
+        null,
+        hydrants
+    );
+    currSink.finishWriting(); // this sink is not writable
+    return new Pair<>(identifier, currSink);
+  }
+
+  private void removeSink(
+      final SegmentIdWithShardSpec identifier,
+      final Sink sink,
+      final boolean removeOnDiskData
+  )
+  {
+    // Ensure no future writes will be made to this sink.
+    if (sink.finishWriting()) {
+      // Decrement this sink's rows from the counters. we only count active 
sinks so that we don't double decrement,
+      // i.e. those that haven't been persisted for *InMemory counters, or 
pushed to deep storage for the total counter.
+      rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
+      bytesCurrentlyInMemory.addAndGet(-sink.getBytesInMemory());
+      bytesCurrentlyInMemory.addAndGet(-calculateSinkMemoryInUsed());
+      for (FireHydrant hydrant : sink) {
+        // Decrement memory used by all Memory Mapped Hydrant
+        if (!hydrant.equals(sink.getCurrHydrant())) {
+          bytesCurrentlyInMemory.addAndGet(-calculateMemoryUsedByHydrant());
+        }
+      }
+      // totalRows are not decremented when removing the sink from memory, 
sink was just persisted and it
+      // still "lives" but it is in hibernation. It will be revived later just 
before push.
+    }
+
+
+    if (!sinks.remove(identifier, sink)) {
+      log.error("Sink for segment[%s] no longer valid, not abandoning.", 
identifier);
+    }
+
+    metrics.setSinkCount(sinks.size());
+
+    if (removeOnDiskData) {
+      removeDirectory(computePersistDir(identifier));
+    }
+
+    log.info("Removed sink for segment[%s].", identifier);
+
+  }
+
+  private File computeLockFile()
+  {
+    return new File(tuningConfig.getBasePersistDirectory(), ".lock");
+  }
+
+  private File computePersistDir(SegmentIdWithShardSpec identifier)
+  {
+    return new File(tuningConfig.getBasePersistDirectory(), 
identifier.toString());
+  }
+
+  private File computeIdentifierFile(SegmentIdWithShardSpec identifier)
+  {
+    return new File(computePersistDir(identifier), IDENTIFIER_FILE_NAME);
+  }
+
+  private File computeDescriptorFile(SegmentIdWithShardSpec identifier)
+  {
+    return new File(computePersistDir(identifier), "descriptor.json");
+  }
+
+  private File createPersistDirIfNeeded(SegmentIdWithShardSpec identifier) 
throws IOException
+  {
+    final File persistDir = computePersistDir(identifier);
+    org.apache.commons.io.FileUtils.forceMkdir(persistDir);
+
+    objectMapper.writeValue(computeIdentifierFile(identifier), identifier);
+
+    return persistDir;
+  }
+
+  /**
+   * Persists the given hydrant and returns the number of rows persisted.
+   *
+   * @param indexToPersist hydrant to persist
+   * @param identifier     the segment this hydrant is going to be part of
+   * @return the number of rows persisted
+   */
+  private int persistHydrant(FireHydrant indexToPersist, 
SegmentIdWithShardSpec identifier)
+  {
+    synchronized (indexToPersist) {
+      if (indexToPersist.hasSwapped()) {
+        log.info(
+            "Segment[%s] hydrant[%s] already swapped. Ignoring request to 
persist.",
+            identifier,
+            indexToPersist
+        );
+        return 0;
+      }
+
+      log.debug("Segment[%s], persisting Hydrant[%s]", identifier, 
indexToPersist);
+
+      try {
+        final long startTime = System.nanoTime();
+        int numRows = indexToPersist.getIndex().size();
+
+        // since the sink may have been persisted before it may have lost its
+        // hydrant count, we remember that value in the sinks metadata so we 
have
+        // to pull it from there....
+        SinkMetadata sm = sinksMetadata.get(identifier);
+        if (sm == null) {
+          throw new ISE("Sink must not be null for identifier when persisting 
hydrant", identifier);

Review comment:
       ```suggestion
             throw new ISE("Sink must not be null for identifier[%s] when 
persisting hydrant", identifier);
   ```

##########
File path: 
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
##########
@@ -172,7 +172,7 @@ SegmentWithState getAppendingSegment()
   /**
    * Allocated segments for a sequence
    */
-  static class SegmentsForSequence
+  public static class SegmentsForSequence

Review comment:
       `BatchAppenderatorDriver` is in the same package as 
`BaseAppenderatorDriver`. I assume you meant `BatchAppenderatorDriverTest` 
which is the class you moved to another package. As I said in my other comment, 
the class to test and its corresponding test class should be in the same 
package to help the test coverage bot. I suggested to not move the package of 
`BatchAppenderatorDriverTest` and thus you will not need to change this access 
modifier either. Same for other access modifier changes in this class.

##########
File path: 
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
##########
@@ -0,0 +1,1143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.druid.data.input.Committer;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
+import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.segment.realtime.FireHydrant;
+import org.apache.druid.segment.realtime.plumber.Sink;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+public class BatchAppenderator implements Appenderator
+{
+  public static final int ROUGH_OVERHEAD_PER_SINK = 5000;
+  // Rough estimate of memory footprint of empty FireHydrant based on actual 
heap dumps
+  public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000;
+
+  private static final EmittingLogger log = new 
EmittingLogger(BatchAppenderator.class);
+  private static final int WARN_DELAY = 1000;
+  private static final String IDENTIFIER_FILE_NAME = "identifier.json";
+
+  private final String myId;
+  private final DataSchema schema;
+  private final AppenderatorConfig tuningConfig;
+  private final FireDepartmentMetrics metrics;
+  private final DataSegmentPusher dataSegmentPusher;
+  private final ObjectMapper objectMapper;
+  private final IndexIO indexIO;
+  private final IndexMerger indexMerger;
+  /**
+   * This map needs to be concurrent because it's accessed and mutated from 
multiple threads from where
+   * this Appenderator is used (and methods like {@link 
#add(SegmentIdWithShardSpec, InputRow, Supplier, boolean)} are
+   * called). It could also be accessed (but not mutated) potentially in the 
context
+   * of any thread from {@link #drop}.
+   */
+  private final ConcurrentMap<SegmentIdWithShardSpec, Sink> sinks = new 
ConcurrentHashMap<>();
+  private final long maxBytesTuningConfig;
+  private final boolean skipBytesInMemoryOverheadCheck;
+
+  /**
+   * The following sinks metadata map and associated class are the way to 
retain metadata now that sinks
+   * are being completely removed from memory after each incremental persist.
+   */
+  private final ConcurrentHashMap<SegmentIdWithShardSpec, SinkMetadata> 
sinksMetadata = new ConcurrentHashMap<>();
+
+  /**
+   * This class is used for information that needs to be kept related to Sinks 
as
+   * they are persisted and removed from memory at every incremental persist.
+   * The information is used for sanity checks and as information required
+   * for functionality, depending in the field that is used. More info about 
the
+   * fields is annotated as comments in the class
+   */
+  private static class SinkMetadata
+  {
+    /** This is used to maintain the rows in the sink accross persists of the 
sink
+    * used for functionality (i.e. to detect whether an incremental push
+    * is needed {@link AppenderatorDriverAddResult#isPushRequired(Integer, 
Long)}
+    **/
+    private int numRowsInSegment;
+    /** For sanity check as well as functionality: to make sure that all 
hydrants for a sink are restored from disk at
+     * push time and also to remember the fire hydrant "count" when persisting 
it.
+     */
+    private int numHydrants;
+
+    public SinkMetadata()
+    {
+      this(0, 0);
+    }
+
+    public SinkMetadata(int numRowsInSegment, int numHydrants)
+    {
+      this.numRowsInSegment = numRowsInSegment;
+      this.numHydrants = numHydrants;
+    }
+
+    public void addRows(int num)
+    {
+      numRowsInSegment += num;
+    }
+
+    public void addHydrants(int num)
+    {
+      numHydrants += num;
+    }
+
+    public int getNumRowsInSegment()
+    {
+      return numRowsInSegment;
+    }
+
+    public int getNumHydrants()
+    {
+      return numHydrants;
+    }
+
+  }
+
+  // This variable updated in add(), persist(), and drop()
+  private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
+  private final AtomicInteger totalRows = new AtomicInteger();
+  private final AtomicLong bytesCurrentlyInMemory = new AtomicLong();
+  private final RowIngestionMeters rowIngestionMeters;
+  private final ParseExceptionHandler parseExceptionHandler;
+
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private volatile FileLock basePersistDirLock = null;
+  private volatile FileChannel basePersistDirLockChannel = null;
+
+  /**
+   * This constructor allows the caller to provide its own 
SinkQuerySegmentWalker.
+   * <p>
+   * The sinkTimeline is set to the sink timeline of the provided 
SinkQuerySegmentWalker.
+   * If the SinkQuerySegmentWalker is null, a new sink timeline is initialized.
+   * <p>
+   * It is used by UnifiedIndexerAppenderatorsManager which allows queries on 
data associated with multiple
+   * Appenderators.

Review comment:
       This javadoc is no longer correct. I think you can simply delete it.

##########
File path: 
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
##########
@@ -0,0 +1,1143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.druid.data.input.Committer;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
+import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.segment.realtime.FireHydrant;
+import org.apache.druid.segment.realtime.plumber.Sink;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+public class BatchAppenderator implements Appenderator
+{
+  public static final int ROUGH_OVERHEAD_PER_SINK = 5000;
+  // Rough estimate of memory footprint of empty FireHydrant based on actual 
heap dumps
+  public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000;
+
+  private static final EmittingLogger log = new 
EmittingLogger(BatchAppenderator.class);
+  private static final int WARN_DELAY = 1000;
+  private static final String IDENTIFIER_FILE_NAME = "identifier.json";
+
+  private final String myId;
+  private final DataSchema schema;
+  private final AppenderatorConfig tuningConfig;
+  private final FireDepartmentMetrics metrics;
+  private final DataSegmentPusher dataSegmentPusher;
+  private final ObjectMapper objectMapper;
+  private final IndexIO indexIO;
+  private final IndexMerger indexMerger;
+  /**
+   * This map needs to be concurrent because it's accessed and mutated from 
multiple threads from where
+   * this Appenderator is used (and methods like {@link 
#add(SegmentIdWithShardSpec, InputRow, Supplier, boolean)} are
+   * called). It could also be accessed (but not mutated) potentially in the 
context
+   * of any thread from {@link #drop}.
+   */
+  private final ConcurrentMap<SegmentIdWithShardSpec, Sink> sinks = new 
ConcurrentHashMap<>();
+  private final long maxBytesTuningConfig;
+  private final boolean skipBytesInMemoryOverheadCheck;
+
+  /**
+   * The following sinks metadata map and associated class are the way to 
retain metadata now that sinks
+   * are being completely removed from memory after each incremental persist.
+   */
+  private final ConcurrentHashMap<SegmentIdWithShardSpec, SinkMetadata> 
sinksMetadata = new ConcurrentHashMap<>();
+
+  /**
+   * This class is used for information that needs to be kept related to Sinks 
as
+   * they are persisted and removed from memory at every incremental persist.
+   * The information is used for sanity checks and as information required
+   * for functionality, depending in the field that is used. More info about 
the
+   * fields is annotated as comments in the class
+   */
+  private static class SinkMetadata
+  {
+    /** This is used to maintain the rows in the sink accross persists of the 
sink
+    * used for functionality (i.e. to detect whether an incremental push
+    * is needed {@link AppenderatorDriverAddResult#isPushRequired(Integer, 
Long)}
+    **/
+    private int numRowsInSegment;
+    /** For sanity check as well as functionality: to make sure that all 
hydrants for a sink are restored from disk at
+     * push time and also to remember the fire hydrant "count" when persisting 
it.
+     */
+    private int numHydrants;
+
+    public SinkMetadata()
+    {
+      this(0, 0);
+    }
+
+    public SinkMetadata(int numRowsInSegment, int numHydrants)
+    {
+      this.numRowsInSegment = numRowsInSegment;
+      this.numHydrants = numHydrants;
+    }
+
+    public void addRows(int num)
+    {
+      numRowsInSegment += num;
+    }
+
+    public void addHydrants(int num)
+    {
+      numHydrants += num;
+    }
+
+    public int getNumRowsInSegment()
+    {
+      return numRowsInSegment;
+    }
+
+    public int getNumHydrants()
+    {
+      return numHydrants;
+    }
+
+  }
+
+  // This variable updated in add(), persist(), and drop()
+  private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
+  private final AtomicInteger totalRows = new AtomicInteger();
+  private final AtomicLong bytesCurrentlyInMemory = new AtomicLong();
+  private final RowIngestionMeters rowIngestionMeters;
+  private final ParseExceptionHandler parseExceptionHandler;
+
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private volatile FileLock basePersistDirLock = null;
+  private volatile FileChannel basePersistDirLockChannel = null;
+
+  /**
+   * This constructor allows the caller to provide its own 
SinkQuerySegmentWalker.
+   * <p>
+   * The sinkTimeline is set to the sink timeline of the provided 
SinkQuerySegmentWalker.
+   * If the SinkQuerySegmentWalker is null, a new sink timeline is initialized.
+   * <p>
+   * It is used by UnifiedIndexerAppenderatorsManager which allows queries on 
data associated with multiple
+   * Appenderators.
+   */
+  BatchAppenderator(
+      String id,
+      DataSchema schema,
+      AppenderatorConfig tuningConfig,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      @Nullable SinkQuerySegmentWalker sinkQuerySegmentWalker,
+      IndexIO indexIO,
+      IndexMerger indexMerger,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
+  )
+  {
+    Preconditions.checkArgument(
+        sinkQuerySegmentWalker == null,
+        "Batch appenderator does not use a versioned timeline"
+    );
+
+    this.myId = id;
+    this.schema = Preconditions.checkNotNull(schema, "schema");
+    this.tuningConfig = Preconditions.checkNotNull(tuningConfig, 
"tuningConfig");
+    this.metrics = Preconditions.checkNotNull(metrics, "metrics");
+    this.dataSegmentPusher = Preconditions.checkNotNull(dataSegmentPusher, 
"dataSegmentPusher");
+    this.objectMapper = Preconditions.checkNotNull(objectMapper, 
"objectMapper");
+    this.indexIO = Preconditions.checkNotNull(indexIO, "indexIO");
+    this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger");
+    this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, 
"rowIngestionMeters");
+    this.parseExceptionHandler = 
Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
+
+    maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
+    skipBytesInMemoryOverheadCheck = 
tuningConfig.isSkipBytesInMemoryOverheadCheck();
+  }
+
+  @Override
+  public String getId()
+  {
+    return myId;
+  }
+
+  @Override
+  public String getDataSource()
+  {
+    return schema.getDataSource();
+  }
+
+  @Override
+  public Object startJob()
+  {
+    tuningConfig.getBasePersistDirectory().mkdirs();
+    lockBasePersistDirectory();
+    return null;
+  }
+
+  @Override
+  public AppenderatorAddResult add(
+      final SegmentIdWithShardSpec identifier,
+      final InputRow row,
+      @Nullable final Supplier<Committer> committerSupplier,
+      final boolean allowIncrementalPersists
+  ) throws IndexSizeExceededException, SegmentNotWritableException
+  {
+
+    Preconditions.checkArgument(
+        committerSupplier == null,
+        "Batch appenderator does not need a committer!"
+    );
+
+    Preconditions.checkArgument(
+        allowIncrementalPersists,
+        "Batch appenderator should always allow incremental persists!"
+    );
+
+    if (!identifier.getDataSource().equals(schema.getDataSource())) {
+      throw new IAE(
+          "Expected dataSource[%s] but was asked to insert row for 
dataSource[%s]?!",
+          schema.getDataSource(),
+          identifier.getDataSource()
+      );
+    }
+
+    final Sink sink = getOrCreateSink(identifier);
+    metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch());
+    final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
+    final int sinkRowsInMemoryAfterAdd;
+    final long bytesInMemoryBeforeAdd = sink.getBytesInMemory();
+    final long bytesInMemoryAfterAdd;
+    final IncrementalIndexAddResult addResult;
+
+    try {
+      addResult = sink.add(row, false); // allow incrememtal persis is always 
true for batch
+      sinkRowsInMemoryAfterAdd = addResult.getRowCount();
+      bytesInMemoryAfterAdd = addResult.getBytesInMemory();
+    }
+    catch (IndexSizeExceededException e) {
+      // Uh oh, we can't do anything about this! We can't persist (commit 
metadata would be out of sync) and we
+      // can't add the row (it just failed). This should never actually 
happen, though, because we check
+      // sink.canAddRow after returning from add.
+      log.error(e, "Sink for segment[%s] was unexpectedly full!", identifier);
+      throw e;
+    }
+
+    if (sinkRowsInMemoryAfterAdd < 0) {
+      throw new SegmentNotWritableException("Attempt to add row to swapped-out 
sink for segment[%s].", identifier);
+    }
+
+    if (addResult.isRowAdded()) {
+      rowIngestionMeters.incrementProcessed();
+    } else if (addResult.hasParseException()) {
+      parseExceptionHandler.handle(addResult.getParseException());
+    }
+
+    final int numAddedRows = sinkRowsInMemoryAfterAdd - 
sinkRowsInMemoryBeforeAdd;
+    rowsCurrentlyInMemory.addAndGet(numAddedRows);
+    bytesCurrentlyInMemory.addAndGet(bytesInMemoryAfterAdd - 
bytesInMemoryBeforeAdd);
+    totalRows.addAndGet(numAddedRows);
+    sinksMetadata.computeIfAbsent(identifier, unused -> new 
SinkMetadata()).addRows(numAddedRows);
+
+    boolean persist = false;
+    List<String> persistReasons = new ArrayList<>();
+
+    if (!sink.canAppendRow()) {
+      persist = true;
+      persistReasons.add("No more rows can be appended to sink");
+    }
+    if (rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) {
+      persist = true;
+      persistReasons.add(StringUtils.format(
+          "rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d]",
+          rowsCurrentlyInMemory.get(),
+          tuningConfig.getMaxRowsInMemory()
+      ));
+    }
+    if (bytesCurrentlyInMemory.get() >= maxBytesTuningConfig) {
+      persist = true;
+      persistReasons.add(StringUtils.format(
+          "bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]",
+          bytesCurrentlyInMemory.get(),
+          maxBytesTuningConfig
+      ));
+    }
+    if (persist) {
+      // persistAll clears rowsCurrentlyInMemory, no need to update it.
+      log.info("Incremental persist to disk because %s.", String.join(",", 
persistReasons));
+
+      long bytesToBePersisted = 0L;
+      for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
+        final Sink sinkEntry = entry.getValue();
+        if (sinkEntry != null) {
+          bytesToBePersisted += sinkEntry.getBytesInMemory();
+          if (sinkEntry.swappable()) {
+            // Code for batch no longer memory maps hydrants but they still 
take memory...
+            int memoryStillInUse = calculateMemoryUsedByHydrant();
+            bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
+          }
+        }
+      }
+
+      if (!skipBytesInMemoryOverheadCheck
+          && bytesCurrentlyInMemory.get() - bytesToBePersisted > 
maxBytesTuningConfig) {
+        // We are still over maxBytesTuningConfig even after persisting.
+        // This means that we ran out of all available memory to ingest (due 
to overheads created as part of ingestion)
+        final String alertMessage = StringUtils.format(
+            "Task has exceeded safe estimated heap usage limits, failing "
+            + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: 
[%d])"
+            + "(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > 
maxBytesTuningConfig: [%d])",
+            sinks.size(),
+            sinks.values().stream().mapToInt(Iterables::size).sum(),
+            getTotalRowCount(),
+            bytesCurrentlyInMemory.get(),
+            bytesToBePersisted,
+            maxBytesTuningConfig
+        );
+        final String errorMessage = StringUtils.format(
+            "%s.\nThis can occur when the overhead from too many intermediary 
segment persists becomes to "
+            + "great to have enough space to process additional input rows. 
This check, along with metering the overhead "
+            + "of these objects to factor into the 'maxBytesInMemory' 
computation, can be disabled by setting "
+            + "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so 
might allow the task to naturally encounter "
+            + "a 'java.lang.OutOfMemoryError'). Alternatively, 
'maxBytesInMemory' can be increased which will cause an "
+            + "increase in heap footprint, but will allow for more 
intermediary segment persists to occur before "
+            + "reaching this condition.",
+            alertMessage
+        );
+        log.makeAlert(alertMessage)
+           .addData("dataSource", schema.getDataSource())
+           .emit();
+        throw new RuntimeException(errorMessage);
+      }
+
+      persistAllAndClear();
+
+    }
+    return new AppenderatorAddResult(identifier, 
sinksMetadata.get(identifier).numRowsInSegment, false);
+  }
+
+  @Override
+  public List<SegmentIdWithShardSpec> getSegments()
+  {
+    return ImmutableList.copyOf(sinks.keySet());
+  }
+
+  @Override
+  public int getRowCount(final SegmentIdWithShardSpec identifier)
+  {
+    return sinksMetadata.get(identifier).getNumRowsInSegment();
+  }
+
+  @Override
+  public int getTotalRowCount()
+  {
+    return totalRows.get();
+  }
+
+  @VisibleForTesting
+  public int getRowsInMemory()
+  {
+    return rowsCurrentlyInMemory.get();
+  }
+
+  @VisibleForTesting
+  public long getBytesCurrentlyInMemory()
+  {
+    return bytesCurrentlyInMemory.get();
+  }
+
+  @VisibleForTesting
+  public long getBytesInMemory(SegmentIdWithShardSpec identifier)
+  {
+    final Sink sink = sinks.get(identifier);
+
+    if (sink == null) {
+      return 0L; // sinks are removed after a persist
+    } else {
+      return sink.getBytesInMemory();
+    }
+  }
+
+  private Sink getOrCreateSink(final SegmentIdWithShardSpec identifier)
+  {
+    Sink retVal = sinks.get(identifier);
+
+    if (retVal == null) {
+      retVal = new Sink(
+          identifier.getInterval(),
+          schema,
+          identifier.getShardSpec(),
+          identifier.getVersion(),
+          tuningConfig.getAppendableIndexSpec(),
+          tuningConfig.getMaxRowsInMemory(),
+          maxBytesTuningConfig,
+          null
+      );
+      bytesCurrentlyInMemory.addAndGet(calculateSinkMemoryInUsed());
+
+      sinks.put(identifier, retVal);
+      metrics.setSinkCount(sinks.size());
+    }
+
+    return retVal;
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, 
final Iterable<Interval> intervals)
+  {
+    throw new UnsupportedOperationException("No query runner for batch 
appenderator");
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, 
final Iterable<SegmentDescriptor> specs)
+  {
+    throw new UnsupportedOperationException("No query runner for batch 
appenderator");
+  }
+
+  @Override
+  public void clear() throws InterruptedException
+  {
+    clear(true);
+  }
+
+  private void clear(boolean removeOnDiskData)
+  {
+    // Drop commit metadata, then abandon all segments.
+    log.info("Clearing all sinks & hydrants, removing data on disk: [%s]", 
removeOnDiskData);
+    // Drop everything.
+    for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
+      removeSink(entry.getKey(), entry.getValue(), removeOnDiskData);
+    }
+  }
+
+  @Override
+  public ListenableFuture<?> drop(final SegmentIdWithShardSpec identifier)
+  {
+    final Sink sink = sinks.get(identifier);
+    SinkMetadata sm = sinksMetadata.remove(identifier);
+    if (sm != null) {
+      int originalTotalRows = getTotalRowCount();
+      int rowsToDrop = sm.getNumRowsInSegment();
+      int totalRowsAfter = originalTotalRows - rowsToDrop;
+      if (totalRowsAfter < 0) {
+        log.warn("Total rows[%d] after dropping segment[%s] rows [%d]", 
totalRowsAfter, identifier, rowsToDrop);
+      }
+      totalRows.set(Math.max(totalRowsAfter, 0));
+    }
+    if (sink != null) {
+      removeSink(identifier, sink, true);
+    }
+    return Futures.immediateFuture(null);
+  }
+
+  private void persistAllAndClear()
+  {
+    // make sure sinks are cleared before push is called
+    try {
+      persistAll(null).get();
+      clear(false);
+    }
+    catch (Throwable t) {
+      throw new RE(t, "Error while persisting");
+    }
+  }
+
+  @Override
+  public ListenableFuture<Object> persistAll(@Nullable final Committer 
committer)
+  {
+    final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = 
new ArrayList<>();
+    int numPersistedRows = 0;
+    long bytesPersisted = 0L;
+    MutableInt totalHydrantsCount = new MutableInt();
+    MutableInt totalHydrantsPersistedAcrossSinks = new MutableInt();
+    final long totalSinks = sinks.size();
+    for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
+      final SegmentIdWithShardSpec identifier = entry.getKey();
+      final Sink sink = entry.getValue();
+      if (sink == null) {
+        throw new ISE("No sink for identifier: %s", identifier);
+      }
+
+      final List<FireHydrant> hydrants = Lists.newArrayList(sink);
+      totalHydrantsCount.add(hydrants.size());
+      numPersistedRows += sink.getNumRowsInMemory();
+      bytesPersisted += sink.getBytesInMemory();
+
+      final int limit = sink.isWritable() ? hydrants.size() - 1 : 
hydrants.size();
+
+      // gather hydrants that have not been persisted:
+      for (FireHydrant hydrant : hydrants.subList(0, limit)) {
+        if (!hydrant.hasSwapped()) {
+          log.debug("Hydrant[%s] hasn't persisted yet, persisting. 
Segment[%s]", hydrant, identifier);
+          indexesToPersist.add(Pair.of(hydrant, identifier));
+          totalHydrantsPersistedAcrossSinks.add(1);
+        }
+      }
+
+      if (sink.swappable()) {
+        // It is swappable. Get the old one to persist it and create a new one:
+        indexesToPersist.add(Pair.of(sink.swap(), identifier));
+        totalHydrantsPersistedAcrossSinks.add(1);
+      }
+
+    }
+    log.debug("Submitting persist runnable for dataSource[%s]", 
schema.getDataSource());
+
+    if (indexesToPersist.isEmpty()) {
+      log.info("No indexes will be peristed");
+    }
+    final Stopwatch runExecStopwatch = Stopwatch.createStarted();
+    final Stopwatch persistStopwatch = Stopwatch.createStarted();
+    AtomicLong totalPersistedRows = new AtomicLong(numPersistedRows);
+
+    try {
+      for (Pair<FireHydrant, SegmentIdWithShardSpec> pair : indexesToPersist) {
+        metrics.incrementRowOutputCount(persistHydrant(pair.lhs, pair.rhs));
+      }
+
+      log.info(
+          "Persisted in-memory data for segments: %s",
+          indexesToPersist.stream()
+                          .filter(itp -> itp.rhs != null)
+                          .map(itp -> itp.rhs.asSegmentId().toString())
+                          .distinct()
+                          .collect(Collectors.joining(", "))
+      );
+      log.info(
+          "Persisted stats: processed rows: [%d], persisted rows[%d], sinks: 
[%d], total fireHydrants (across sinks): [%d], persisted fireHydrants (across 
sinks): [%d]",
+          rowIngestionMeters.getProcessed(),
+          totalPersistedRows.get(),
+          totalSinks,
+          totalHydrantsCount.longValue(),
+          totalHydrantsPersistedAcrossSinks.longValue()
+      );
+
+    }
+    catch (Exception e) {
+      metrics.incrementFailedPersists();
+      throw e;
+    }
+    finally {
+      metrics.incrementNumPersists();
+      
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
+      persistStopwatch.stop();
+    }
+
+    final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
+    metrics.incrementPersistBackPressureMillis(startDelay);
+    if (startDelay > WARN_DELAY) {
+      log.warn("Ingestion was throttled for [%,d] millis because persists were 
pending.", startDelay);
+    }
+    runExecStopwatch.stop();
+
+    // NB: The rows are still in memory until they're done persisting, but we 
only count rows in active indexes.
+    rowsCurrentlyInMemory.addAndGet(-numPersistedRows);
+    bytesCurrentlyInMemory.addAndGet(-bytesPersisted);
+
+    log.info("Persisted rows[%,d] and bytes[%,d]", numPersistedRows, 
bytesPersisted);
+
+    return Futures.immediateFuture(null);
+  }
+
+  @Override
+  public ListenableFuture<SegmentsAndCommitMetadata> push(
+      final Collection<SegmentIdWithShardSpec> identifiers,
+      @Nullable final Committer committer,
+      final boolean useUniquePath
+  )
+  {
+
+    if (committer != null) {
+      throw new ISE("There should be no committer for batch ingestion");
+    }
+
+    // Any sinks not persisted so far will be persisted before push:
+    persistAllAndClear();
+
+    log.info("Preparing to push...");
+    final List<DataSegment> dataSegments = new ArrayList<>();
+    List<File> persistedIdentifiers = getPersistedidentifierPaths();
+    if (persistedIdentifiers == null) {
+      throw new ISE("Identifiers were persisted but could not be retrieved");
+    }
+    for (File identifier : persistedIdentifiers) {
+      Pair<SegmentIdWithShardSpec, Sink> identifiersAndSinks;
+      try {
+        identifiersAndSinks = getIdentifierAndSinkForPersistedFile(identifier);
+      }
+      catch (IOException e) {
+        throw new ISE(e, "Failed to retrieve sinks for identifier", 
identifier);

Review comment:
       ```suggestion
           throw new ISE(e, "Failed to retrieve sinks for identifier[%s]", 
identifier);
   ```

##########
File path: 
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
##########
@@ -0,0 +1,1143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.druid.data.input.Committer;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
+import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.segment.realtime.FireHydrant;
+import org.apache.druid.segment.realtime.plumber.Sink;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+public class BatchAppenderator implements Appenderator
+{
+  public static final int ROUGH_OVERHEAD_PER_SINK = 5000;
+  // Rough estimate of memory footprint of empty FireHydrant based on actual 
heap dumps
+  public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000;
+
+  private static final EmittingLogger log = new 
EmittingLogger(BatchAppenderator.class);
+  private static final int WARN_DELAY = 1000;
+  private static final String IDENTIFIER_FILE_NAME = "identifier.json";
+
+  private final String myId;
+  private final DataSchema schema;
+  private final AppenderatorConfig tuningConfig;
+  private final FireDepartmentMetrics metrics;
+  private final DataSegmentPusher dataSegmentPusher;
+  private final ObjectMapper objectMapper;
+  private final IndexIO indexIO;
+  private final IndexMerger indexMerger;
+  /**
+   * This map needs to be concurrent because it's accessed and mutated from 
multiple threads from where
+   * this Appenderator is used (and methods like {@link 
#add(SegmentIdWithShardSpec, InputRow, Supplier, boolean)} are
+   * called). It could also be accessed (but not mutated) potentially in the 
context
+   * of any thread from {@link #drop}.
+   */
+  private final ConcurrentMap<SegmentIdWithShardSpec, Sink> sinks = new 
ConcurrentHashMap<>();
+  private final long maxBytesTuningConfig;
+  private final boolean skipBytesInMemoryOverheadCheck;
+
+  /**
+   * The following sinks metadata map and associated class are the way to 
retain metadata now that sinks
+   * are being completely removed from memory after each incremental persist.
+   */
+  private final ConcurrentHashMap<SegmentIdWithShardSpec, SinkMetadata> 
sinksMetadata = new ConcurrentHashMap<>();
+
+  /**
+   * This class is used for information that needs to be kept related to Sinks 
as
+   * they are persisted and removed from memory at every incremental persist.
+   * The information is used for sanity checks and as information required
+   * for functionality, depending in the field that is used. More info about 
the
+   * fields is annotated as comments in the class
+   */
+  private static class SinkMetadata
+  {
+    /** This is used to maintain the rows in the sink accross persists of the 
sink
+    * used for functionality (i.e. to detect whether an incremental push
+    * is needed {@link AppenderatorDriverAddResult#isPushRequired(Integer, 
Long)}
+    **/
+    private int numRowsInSegment;
+    /** For sanity check as well as functionality: to make sure that all 
hydrants for a sink are restored from disk at
+     * push time and also to remember the fire hydrant "count" when persisting 
it.
+     */
+    private int numHydrants;
+
+    public SinkMetadata()
+    {
+      this(0, 0);
+    }
+
+    public SinkMetadata(int numRowsInSegment, int numHydrants)
+    {
+      this.numRowsInSegment = numRowsInSegment;
+      this.numHydrants = numHydrants;
+    }
+
+    public void addRows(int num)
+    {
+      numRowsInSegment += num;
+    }
+
+    public void addHydrants(int num)
+    {
+      numHydrants += num;
+    }
+
+    public int getNumRowsInSegment()
+    {
+      return numRowsInSegment;
+    }
+
+    public int getNumHydrants()
+    {
+      return numHydrants;
+    }
+
+  }
+
+  // This variable updated in add(), persist(), and drop()
+  private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
+  private final AtomicInteger totalRows = new AtomicInteger();
+  private final AtomicLong bytesCurrentlyInMemory = new AtomicLong();
+  private final RowIngestionMeters rowIngestionMeters;
+  private final ParseExceptionHandler parseExceptionHandler;
+
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private volatile FileLock basePersistDirLock = null;
+  private volatile FileChannel basePersistDirLockChannel = null;
+
+  /**
+   * This constructor allows the caller to provide its own 
SinkQuerySegmentWalker.
+   * <p>
+   * The sinkTimeline is set to the sink timeline of the provided 
SinkQuerySegmentWalker.
+   * If the SinkQuerySegmentWalker is null, a new sink timeline is initialized.
+   * <p>
+   * It is used by UnifiedIndexerAppenderatorsManager which allows queries on 
data associated with multiple
+   * Appenderators.
+   */
+  BatchAppenderator(
+      String id,
+      DataSchema schema,
+      AppenderatorConfig tuningConfig,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      @Nullable SinkQuerySegmentWalker sinkQuerySegmentWalker,
+      IndexIO indexIO,
+      IndexMerger indexMerger,
+      RowIngestionMeters rowIngestionMeters,
+      ParseExceptionHandler parseExceptionHandler
+  )
+  {
+    Preconditions.checkArgument(
+        sinkQuerySegmentWalker == null,
+        "Batch appenderator does not use a versioned timeline"
+    );
+
+    this.myId = id;
+    this.schema = Preconditions.checkNotNull(schema, "schema");
+    this.tuningConfig = Preconditions.checkNotNull(tuningConfig, 
"tuningConfig");
+    this.metrics = Preconditions.checkNotNull(metrics, "metrics");
+    this.dataSegmentPusher = Preconditions.checkNotNull(dataSegmentPusher, 
"dataSegmentPusher");
+    this.objectMapper = Preconditions.checkNotNull(objectMapper, 
"objectMapper");
+    this.indexIO = Preconditions.checkNotNull(indexIO, "indexIO");
+    this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger");
+    this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, 
"rowIngestionMeters");
+    this.parseExceptionHandler = 
Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
+
+    maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
+    skipBytesInMemoryOverheadCheck = 
tuningConfig.isSkipBytesInMemoryOverheadCheck();
+  }
+
+  @Override
+  public String getId()
+  {
+    return myId;
+  }
+
+  @Override
+  public String getDataSource()
+  {
+    return schema.getDataSource();
+  }
+
+  @Override
+  public Object startJob()
+  {
+    tuningConfig.getBasePersistDirectory().mkdirs();
+    lockBasePersistDirectory();
+    return null;
+  }
+
+  @Override
+  public AppenderatorAddResult add(
+      final SegmentIdWithShardSpec identifier,
+      final InputRow row,
+      @Nullable final Supplier<Committer> committerSupplier,
+      final boolean allowIncrementalPersists
+  ) throws IndexSizeExceededException, SegmentNotWritableException
+  {
+
+    Preconditions.checkArgument(
+        committerSupplier == null,
+        "Batch appenderator does not need a committer!"
+    );
+
+    Preconditions.checkArgument(
+        allowIncrementalPersists,
+        "Batch appenderator should always allow incremental persists!"
+    );
+
+    if (!identifier.getDataSource().equals(schema.getDataSource())) {
+      throw new IAE(
+          "Expected dataSource[%s] but was asked to insert row for 
dataSource[%s]?!",
+          schema.getDataSource(),
+          identifier.getDataSource()
+      );
+    }
+
+    final Sink sink = getOrCreateSink(identifier);
+    metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch());
+    final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
+    final int sinkRowsInMemoryAfterAdd;
+    final long bytesInMemoryBeforeAdd = sink.getBytesInMemory();
+    final long bytesInMemoryAfterAdd;
+    final IncrementalIndexAddResult addResult;
+
+    try {
+      addResult = sink.add(row, false); // allow incrememtal persis is always 
true for batch
+      sinkRowsInMemoryAfterAdd = addResult.getRowCount();
+      bytesInMemoryAfterAdd = addResult.getBytesInMemory();
+    }
+    catch (IndexSizeExceededException e) {
+      // Uh oh, we can't do anything about this! We can't persist (commit 
metadata would be out of sync) and we
+      // can't add the row (it just failed). This should never actually 
happen, though, because we check
+      // sink.canAddRow after returning from add.
+      log.error(e, "Sink for segment[%s] was unexpectedly full!", identifier);
+      throw e;
+    }
+
+    if (sinkRowsInMemoryAfterAdd < 0) {
+      throw new SegmentNotWritableException("Attempt to add row to swapped-out 
sink for segment[%s].", identifier);
+    }
+
+    if (addResult.isRowAdded()) {
+      rowIngestionMeters.incrementProcessed();
+    } else if (addResult.hasParseException()) {
+      parseExceptionHandler.handle(addResult.getParseException());
+    }
+
+    final int numAddedRows = sinkRowsInMemoryAfterAdd - 
sinkRowsInMemoryBeforeAdd;
+    rowsCurrentlyInMemory.addAndGet(numAddedRows);
+    bytesCurrentlyInMemory.addAndGet(bytesInMemoryAfterAdd - 
bytesInMemoryBeforeAdd);
+    totalRows.addAndGet(numAddedRows);
+    sinksMetadata.computeIfAbsent(identifier, unused -> new 
SinkMetadata()).addRows(numAddedRows);
+
+    boolean persist = false;
+    List<String> persistReasons = new ArrayList<>();
+
+    if (!sink.canAppendRow()) {
+      persist = true;
+      persistReasons.add("No more rows can be appended to sink");
+    }
+    if (rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) {
+      persist = true;
+      persistReasons.add(StringUtils.format(
+          "rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d]",
+          rowsCurrentlyInMemory.get(),
+          tuningConfig.getMaxRowsInMemory()
+      ));
+    }
+    if (bytesCurrentlyInMemory.get() >= maxBytesTuningConfig) {
+      persist = true;
+      persistReasons.add(StringUtils.format(
+          "bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]",
+          bytesCurrentlyInMemory.get(),
+          maxBytesTuningConfig
+      ));
+    }
+    if (persist) {
+      // persistAll clears rowsCurrentlyInMemory, no need to update it.
+      log.info("Incremental persist to disk because %s.", String.join(",", 
persistReasons));
+
+      long bytesToBePersisted = 0L;
+      for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
+        final Sink sinkEntry = entry.getValue();
+        if (sinkEntry != null) {
+          bytesToBePersisted += sinkEntry.getBytesInMemory();
+          if (sinkEntry.swappable()) {
+            // Code for batch no longer memory maps hydrants but they still 
take memory...
+            int memoryStillInUse = calculateMemoryUsedByHydrant();
+            bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
+          }
+        }
+      }
+
+      if (!skipBytesInMemoryOverheadCheck
+          && bytesCurrentlyInMemory.get() - bytesToBePersisted > 
maxBytesTuningConfig) {
+        // We are still over maxBytesTuningConfig even after persisting.
+        // This means that we ran out of all available memory to ingest (due 
to overheads created as part of ingestion)
+        final String alertMessage = StringUtils.format(
+            "Task has exceeded safe estimated heap usage limits, failing "
+            + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: 
[%d])"
+            + "(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > 
maxBytesTuningConfig: [%d])",
+            sinks.size(),
+            sinks.values().stream().mapToInt(Iterables::size).sum(),
+            getTotalRowCount(),
+            bytesCurrentlyInMemory.get(),
+            bytesToBePersisted,
+            maxBytesTuningConfig
+        );
+        final String errorMessage = StringUtils.format(
+            "%s.\nThis can occur when the overhead from too many intermediary 
segment persists becomes to "
+            + "great to have enough space to process additional input rows. 
This check, along with metering the overhead "
+            + "of these objects to factor into the 'maxBytesInMemory' 
computation, can be disabled by setting "
+            + "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so 
might allow the task to naturally encounter "
+            + "a 'java.lang.OutOfMemoryError'). Alternatively, 
'maxBytesInMemory' can be increased which will cause an "
+            + "increase in heap footprint, but will allow for more 
intermediary segment persists to occur before "
+            + "reaching this condition.",
+            alertMessage
+        );
+        log.makeAlert(alertMessage)
+           .addData("dataSource", schema.getDataSource())
+           .emit();
+        throw new RuntimeException(errorMessage);
+      }
+
+      persistAllAndClear();
+
+    }
+    return new AppenderatorAddResult(identifier, 
sinksMetadata.get(identifier).numRowsInSegment, false);
+  }
+
+  @Override
+  public List<SegmentIdWithShardSpec> getSegments()
+  {
+    return ImmutableList.copyOf(sinks.keySet());
+  }
+
+  @Override
+  public int getRowCount(final SegmentIdWithShardSpec identifier)
+  {
+    return sinksMetadata.get(identifier).getNumRowsInSegment();
+  }
+
+  @Override
+  public int getTotalRowCount()
+  {
+    return totalRows.get();
+  }
+
+  @VisibleForTesting
+  public int getRowsInMemory()
+  {
+    return rowsCurrentlyInMemory.get();
+  }
+
+  @VisibleForTesting
+  public long getBytesCurrentlyInMemory()
+  {
+    return bytesCurrentlyInMemory.get();
+  }
+
+  @VisibleForTesting
+  public long getBytesInMemory(SegmentIdWithShardSpec identifier)
+  {
+    final Sink sink = sinks.get(identifier);
+
+    if (sink == null) {
+      return 0L; // sinks are removed after a persist
+    } else {
+      return sink.getBytesInMemory();
+    }
+  }
+
+  private Sink getOrCreateSink(final SegmentIdWithShardSpec identifier)
+  {
+    Sink retVal = sinks.get(identifier);
+
+    if (retVal == null) {
+      retVal = new Sink(
+          identifier.getInterval(),
+          schema,
+          identifier.getShardSpec(),
+          identifier.getVersion(),
+          tuningConfig.getAppendableIndexSpec(),
+          tuningConfig.getMaxRowsInMemory(),
+          maxBytesTuningConfig,
+          null
+      );
+      bytesCurrentlyInMemory.addAndGet(calculateSinkMemoryInUsed());
+
+      sinks.put(identifier, retVal);
+      metrics.setSinkCount(sinks.size());
+    }
+
+    return retVal;
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, 
final Iterable<Interval> intervals)
+  {
+    throw new UnsupportedOperationException("No query runner for batch 
appenderator");
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, 
final Iterable<SegmentDescriptor> specs)
+  {
+    throw new UnsupportedOperationException("No query runner for batch 
appenderator");
+  }
+
+  @Override
+  public void clear() throws InterruptedException
+  {
+    clear(true);
+  }
+
+  private void clear(boolean removeOnDiskData)
+  {
+    // Drop commit metadata, then abandon all segments.
+    log.info("Clearing all sinks & hydrants, removing data on disk: [%s]", 
removeOnDiskData);
+    // Drop everything.
+    for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
+      removeSink(entry.getKey(), entry.getValue(), removeOnDiskData);
+    }
+  }
+
+  @Override
+  public ListenableFuture<?> drop(final SegmentIdWithShardSpec identifier)
+  {
+    final Sink sink = sinks.get(identifier);
+    SinkMetadata sm = sinksMetadata.remove(identifier);
+    if (sm != null) {
+      int originalTotalRows = getTotalRowCount();
+      int rowsToDrop = sm.getNumRowsInSegment();
+      int totalRowsAfter = originalTotalRows - rowsToDrop;
+      if (totalRowsAfter < 0) {
+        log.warn("Total rows[%d] after dropping segment[%s] rows [%d]", 
totalRowsAfter, identifier, rowsToDrop);
+      }
+      totalRows.set(Math.max(totalRowsAfter, 0));
+    }
+    if (sink != null) {
+      removeSink(identifier, sink, true);
+    }
+    return Futures.immediateFuture(null);
+  }
+
+  private void persistAllAndClear()
+  {
+    // make sure sinks are cleared before push is called
+    try {
+      persistAll(null).get();
+      clear(false);
+    }
+    catch (Throwable t) {
+      throw new RE(t, "Error while persisting");
+    }
+  }
+
+  @Override
+  public ListenableFuture<Object> persistAll(@Nullable final Committer 
committer)
+  {
+    final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = 
new ArrayList<>();
+    int numPersistedRows = 0;
+    long bytesPersisted = 0L;
+    MutableInt totalHydrantsCount = new MutableInt();
+    MutableInt totalHydrantsPersistedAcrossSinks = new MutableInt();
+    final long totalSinks = sinks.size();
+    for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
+      final SegmentIdWithShardSpec identifier = entry.getKey();
+      final Sink sink = entry.getValue();
+      if (sink == null) {
+        throw new ISE("No sink for identifier: %s", identifier);
+      }
+
+      final List<FireHydrant> hydrants = Lists.newArrayList(sink);
+      totalHydrantsCount.add(hydrants.size());
+      numPersistedRows += sink.getNumRowsInMemory();
+      bytesPersisted += sink.getBytesInMemory();
+
+      final int limit = sink.isWritable() ? hydrants.size() - 1 : 
hydrants.size();
+
+      // gather hydrants that have not been persisted:
+      for (FireHydrant hydrant : hydrants.subList(0, limit)) {
+        if (!hydrant.hasSwapped()) {
+          log.debug("Hydrant[%s] hasn't persisted yet, persisting. 
Segment[%s]", hydrant, identifier);
+          indexesToPersist.add(Pair.of(hydrant, identifier));
+          totalHydrantsPersistedAcrossSinks.add(1);
+        }
+      }
+
+      if (sink.swappable()) {
+        // It is swappable. Get the old one to persist it and create a new one:
+        indexesToPersist.add(Pair.of(sink.swap(), identifier));
+        totalHydrantsPersistedAcrossSinks.add(1);
+      }
+
+    }
+    log.debug("Submitting persist runnable for dataSource[%s]", 
schema.getDataSource());
+
+    if (indexesToPersist.isEmpty()) {
+      log.info("No indexes will be peristed");
+    }
+    final Stopwatch runExecStopwatch = Stopwatch.createStarted();
+    final Stopwatch persistStopwatch = Stopwatch.createStarted();
+    AtomicLong totalPersistedRows = new AtomicLong(numPersistedRows);
+
+    try {
+      for (Pair<FireHydrant, SegmentIdWithShardSpec> pair : indexesToPersist) {
+        metrics.incrementRowOutputCount(persistHydrant(pair.lhs, pair.rhs));
+      }
+
+      log.info(
+          "Persisted in-memory data for segments: %s",
+          indexesToPersist.stream()
+                          .filter(itp -> itp.rhs != null)
+                          .map(itp -> itp.rhs.asSegmentId().toString())
+                          .distinct()
+                          .collect(Collectors.joining(", "))
+      );
+      log.info(
+          "Persisted stats: processed rows: [%d], persisted rows[%d], sinks: 
[%d], total fireHydrants (across sinks): [%d], persisted fireHydrants (across 
sinks): [%d]",
+          rowIngestionMeters.getProcessed(),
+          totalPersistedRows.get(),
+          totalSinks,
+          totalHydrantsCount.longValue(),
+          totalHydrantsPersistedAcrossSinks.longValue()
+      );
+
+    }
+    catch (Exception e) {
+      metrics.incrementFailedPersists();
+      throw e;
+    }
+    finally {
+      metrics.incrementNumPersists();
+      
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
+      persistStopwatch.stop();
+    }
+
+    final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
+    metrics.incrementPersistBackPressureMillis(startDelay);
+    if (startDelay > WARN_DELAY) {
+      log.warn("Ingestion was throttled for [%,d] millis because persists were 
pending.", startDelay);
+    }
+    runExecStopwatch.stop();
+
+    // NB: The rows are still in memory until they're done persisting, but we 
only count rows in active indexes.
+    rowsCurrentlyInMemory.addAndGet(-numPersistedRows);
+    bytesCurrentlyInMemory.addAndGet(-bytesPersisted);
+
+    log.info("Persisted rows[%,d] and bytes[%,d]", numPersistedRows, 
bytesPersisted);
+
+    return Futures.immediateFuture(null);
+  }
+
+  @Override
+  public ListenableFuture<SegmentsAndCommitMetadata> push(
+      final Collection<SegmentIdWithShardSpec> identifiers,
+      @Nullable final Committer committer,
+      final boolean useUniquePath
+  )
+  {
+
+    if (committer != null) {
+      throw new ISE("There should be no committer for batch ingestion");
+    }
+
+    // Any sinks not persisted so far will be persisted before push:
+    persistAllAndClear();
+
+    log.info("Preparing to push...");
+    final List<DataSegment> dataSegments = new ArrayList<>();
+    List<File> persistedIdentifiers = getPersistedidentifierPaths();
+    if (persistedIdentifiers == null) {
+      throw new ISE("Identifiers were persisted but could not be retrieved");
+    }
+    for (File identifier : persistedIdentifiers) {
+      Pair<SegmentIdWithShardSpec, Sink> identifiersAndSinks;
+      try {
+        identifiersAndSinks = getIdentifierAndSinkForPersistedFile(identifier);
+      }
+      catch (IOException e) {
+        throw new ISE(e, "Failed to retrieve sinks for identifier", 
identifier);
+      }
+      final DataSegment dataSegment = mergeAndPush(
+          identifiersAndSinks.lhs,
+          identifiersAndSinks.rhs,
+          useUniquePath
+      );
+      if (dataSegment != null) {
+        dataSegments.add(dataSegment);
+      } else {
+        log.warn("mergeAndPush[%s] returned null, skipping.", 
identifiersAndSinks.lhs);
+      }
+    }
+    log.info("Push complete...");
+
+    return Futures.immediateFuture(new SegmentsAndCommitMetadata(dataSegments, 
null));
+  }
+
+  /**
+   * Merge segment, push to deep storage. Should only be used on segments that 
have been fully persisted.
+   *
+   * @param identifier    sink identifier
+   * @param sink          sink to push
+   * @param useUniquePath true if the segment should be written to a path with 
a unique identifier
+   * @return segment descriptor, or null if the sink is no longer valid
+   */
+  @Nullable
+  private DataSegment mergeAndPush(
+      final SegmentIdWithShardSpec identifier,
+      final Sink sink,
+      final boolean useUniquePath
+  )
+  {
+
+    // Use a descriptor file to indicate that pushing has completed.
+    final File persistDir = computePersistDir(identifier);
+    final File mergedTarget = new File(persistDir, "merged");
+    final File descriptorFile = computeDescriptorFile(identifier);
+
+    // Sanity checks
+    if (sink.isWritable()) {
+      throw new ISE("Expected sink to be no longer writable before 
mergeAndPush for segment[%s].", identifier);
+    }
+
+    int numHydrants = 0;
+    for (FireHydrant hydrant : sink) {
+      synchronized (hydrant) {
+        if (!hydrant.hasSwapped()) {
+          throw new ISE("Expected sink to be fully persisted before 
mergeAndPush for segment[%s].", identifier);
+        }
+      }
+      numHydrants++;
+    }
+
+    SinkMetadata sm = sinksMetadata.get(identifier);
+    if (sm == null) {
+      log.warn("Sink metadata not found just before merge for identifier 
[%s]", identifier);
+    } else if (numHydrants != sm.getNumHydrants()) {
+      throw new ISE("Number of restored hydrants[%d] for identifier[%s] does 
not match expected value[%d]",
+                    numHydrants, identifier, 
sinksMetadata.get(identifier).getNumHydrants());
+    }
+
+    try {
+      if (descriptorFile.exists()) {
+        // Already pushed.
+
+        if (useUniquePath) {
+          // Don't reuse the descriptor, because the caller asked for a unique 
path. Leave the old one as-is, since
+          // it might serve some unknown purpose.
+          log.debug(
+              "Segment[%s] already pushed, but we want a unique path, so will 
push again with a new path.",
+              identifier
+          );
+        } else {
+          log.info("Segment[%s] already pushed, skipping.", identifier);
+          return objectMapper.readValue(descriptorFile, DataSegment.class);
+        }
+      }
+
+      removeDirectory(mergedTarget);
+
+      if (mergedTarget.exists()) {
+        throw new ISE("Merged target[%s] exists after removing?!", 
mergedTarget);
+      }
+
+      final File mergedFile;
+      final long mergeFinishTime;
+      final long startTime = System.nanoTime();
+      List<QueryableIndex> indexes = new ArrayList<>();
+      Closer closer = Closer.create();
+      try {
+        for (FireHydrant fireHydrant : sink) {
+          Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = 
fireHydrant.getAndIncrementSegment();
+          final QueryableIndex queryableIndex = 
segmentAndCloseable.lhs.asQueryableIndex();
+          log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant);
+          indexes.add(queryableIndex);
+          closer.register(segmentAndCloseable.rhs);
+        }
+
+        mergedFile = indexMerger.mergeQueryableIndex(
+            indexes,
+            schema.getGranularitySpec().isRollup(),
+            schema.getAggregators(),
+            schema.getDimensionsSpec(),
+            mergedTarget,
+            tuningConfig.getIndexSpec(),
+            tuningConfig.getSegmentWriteOutMediumFactory(),
+            tuningConfig.getMaxColumnsToMerge()
+        );
+
+        mergeFinishTime = System.nanoTime();
+
+        log.debug("Segment[%s] built in %,dms.", identifier, (mergeFinishTime 
- startTime) / 1000000);
+      }
+      catch (Throwable t) {
+        throw closer.rethrow(t);
+      }
+      finally {
+        closer.close();
+      }
+
+      // Retry pushing segments because uploading to deep storage might fail 
especially for cloud storage types
+      final DataSegment segment = RetryUtils.retry(
+          // The appenderator is currently being used for the local indexing 
task and the Kafka indexing task. For the
+          // Kafka indexing task, pushers must use unique file paths in deep 
storage in order to maintain exactly-once
+          // semantics.
+          () -> dataSegmentPusher.push(
+              mergedFile,
+              sink.getSegment()
+                  
.withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(
+                      indexes,
+                      schema.getDimensionsSpec()
+                  )),
+              useUniquePath
+          ),
+          exception -> exception instanceof Exception,
+          5
+      );
+
+      // Drop the queryable indexes behind the hydrants... they are not needed 
anymore and their
+      // mapped file references
+      // can generate OOMs during merge if enough of them are held back...
+      // agfixme: Since we cannot keep sinks due to memory growth then we have 
to add the sink metadata table and keep it up to date
+      //sinks.put(identifier,sink);
+      for (FireHydrant fireHydrant : sink) {
+        fireHydrant.swapSegment(null);
+      }
+
+      // cleanup, sink no longer needed
+      removeDirectory(computePersistDir(identifier));
+
+      final long pushFinishTime = System.nanoTime();
+
+      log.info(
+          "Segment[%s] of %,d bytes "
+          + "built from %d incremental persist(s) in %,dms; "
+          + "pushed to deep storage in %,dms. "
+          + "Load spec is: %s",
+          identifier,
+          segment.getSize(),
+          indexes.size(),
+          (mergeFinishTime - startTime) / 1000000,
+          (pushFinishTime - mergeFinishTime) / 1000000,
+          objectMapper.writeValueAsString(segment.getLoadSpec())
+      );
+
+      return segment;
+    }
+    catch (Exception e) {
+      metrics.incrementFailedHandoffs();
+      log.warn(e, "Failed to push merged index for segment[%s].", identifier);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close()
+  {
+    if (!closed.compareAndSet(false, true)) {
+      log.debug("Appenderator already closed, skipping close() call.");
+      return;
+    }
+
+    log.debug("Shutting down...");
+
+    for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
+      removeSink(entry.getKey(), entry.getValue(), false);
+    }
+
+    unlockBasePersistDirectory();
+
+    // cleanup:
+    List<File> persistedIdentifiers = getPersistedidentifierPaths();
+    if (persistedIdentifiers != null) {
+      for (File identifier : persistedIdentifiers) {
+        removeDirectory(identifier);
+      }
+    }
+
+    totalRows.set(0);
+    sinksMetadata.clear();
+  }
+
+  /**
+    Nothing to do since there are no executors
+   */
+  @Override
+  public void closeNow()
+  {
+    if (!closed.compareAndSet(false, true)) {
+      log.debug("Appenderator already closed, skipping closeNow() call.");
+      return;
+    }
+
+    log.debug("Shutting down immediately...");
+  }
+
+  private void lockBasePersistDirectory()
+  {
+    if (basePersistDirLock == null) {
+      try {
+        basePersistDirLockChannel = FileChannel.open(
+            computeLockFile().toPath(),
+            StandardOpenOption.CREATE,
+            StandardOpenOption.WRITE
+        );
+
+        basePersistDirLock = basePersistDirLockChannel.tryLock();
+        if (basePersistDirLock == null) {
+          throw new ISE("Cannot acquire lock on basePersistDir: %s", 
computeLockFile());
+        }
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private void unlockBasePersistDirectory()
+  {
+    try {
+      if (basePersistDirLock != null) {
+        basePersistDirLock.release();
+        basePersistDirLockChannel.close();
+        basePersistDirLock = null;
+      }
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  @Nullable
+  public List<File> getPersistedidentifierPaths()
+  {
+
+    ArrayList<File> retVal = new ArrayList<>();
+
+    final File baseDir = tuningConfig.getBasePersistDirectory();
+    if (!baseDir.exists()) {
+      return null;
+    }
+
+    final File[] files = baseDir.listFiles();
+    if (files == null) {
+      return null;
+    }
+
+    for (File sinkDir : files) {
+      final File identifierFile = new File(sinkDir, IDENTIFIER_FILE_NAME);
+      if (!identifierFile.isFile()) {
+        // No identifier in this sinkDir; it must not actually be a sink 
directory. Skip it.
+        continue;
+      }
+      retVal.add(sinkDir);
+    }
+
+    return retVal;
+  }
+
+  private Pair<SegmentIdWithShardSpec, Sink> 
getIdentifierAndSinkForPersistedFile(File identifierPath)
+      throws IOException
+  {
+
+    final SegmentIdWithShardSpec identifier = objectMapper.readValue(
+        new File(identifierPath, IDENTIFIER_FILE_NAME),
+        SegmentIdWithShardSpec.class
+    );
+
+    // To avoid reading and listing of "merged" dir and other special files
+    final File[] sinkFiles = identifierPath.listFiles(
+        (dir, fileName) -> !(Ints.tryParse(fileName) == null)
+    );
+    if (sinkFiles == null) {
+      throw new ISE("Problem reading persisted sinks in path", identifierPath);

Review comment:
       ```suggestion
         throw new ISE("Problem reading persisted sinks in path[%s]", 
identifierPath);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to