loquisgon commented on a change in pull request #11294: URL: https://github.com/apache/druid/pull/11294#discussion_r658528575
########## File path: server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java ########## @@ -0,0 +1,1335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.druid.data.input.Committer; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexMerger; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.ReferenceCountingSegment; +import org.apache.druid.segment.incremental.IncrementalIndexAddResult; +import org.apache.druid.segment.incremental.IndexSizeExceededException; +import org.apache.druid.segment.incremental.ParseExceptionHandler; +import org.apache.druid.segment.incremental.RowIngestionMeters; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.realtime.FireDepartmentMetrics; +import org.apache.druid.segment.realtime.FireHydrant; +import org.apache.druid.segment.realtime.plumber.Sink; +import org.apache.druid.server.coordination.DataSegmentAnnouncer; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +public class BatchAppenderator implements Appenderator +{ + public static final int ROUGH_OVERHEAD_PER_SINK = 5000; + // Rough estimate of memory footprint of empty FireHydrant based on actual heap dumps + public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000; + + private static final EmittingLogger log = new EmittingLogger(BatchAppenderator.class); + private static final int WARN_DELAY = 1000; + private static final String IDENTIFIER_FILE_NAME = "identifier.json"; + + private final String myId; + private final DataSchema schema; + private final AppenderatorConfig tuningConfig; + private final FireDepartmentMetrics metrics; + private final DataSegmentPusher dataSegmentPusher; + private final ObjectMapper objectMapper; + private final IndexIO indexIO; + private final IndexMerger indexMerger; + /** + * This map needs to be concurrent because it's accessed and mutated from multiple threads: both the thread from where + * this Appenderator is used (and methods like {@link #add(SegmentIdWithShardSpec, InputRow, Supplier, boolean)} are + * called) and from {@link #persistExecutor}. It could also be accessed (but not mutated) potentially in the context + * of any thread from {@link #drop}. + */ + private final ConcurrentMap<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>(); + private final long maxBytesTuningConfig; + private final boolean skipBytesInMemoryOverheadCheck; + + /** + * The following sinks metadata map and associated class are the way to retain metadata now that sinks + * are being completely removed from memory after each incremental persist. + */ + private final ConcurrentHashMap<SegmentIdWithShardSpec, SinkMetadata> sinksMetadata = new ConcurrentHashMap<>(); + + /** + * This class is used for information that needs to be kept related to Sinks as + * they are persisted and removed from memory at every incremental persist. + * The information is used for sanity checks and as information required + * for functionality, depending in the field that is used. More info about the + * fields is annotated as comments in the class + */ + private static class SinkMetadata + { + /** This is used to maintain the rows in the sink accross persists of the sink + * used for functionality (i.e. to detect whether an incremental push + * is needed {@link AppenderatorDriverAddResult#isPushRequired(Integer, Long)} + **/ + private int numRowsInSegment; + /** For sanity check as well as functionality: to make sure that all hydrants for a sink are restored from disk at + * push time and also to remember the fire hydrant "count" when persisting it. + */ + private int numHydrants; + + public SinkMetadata() + { + this(0, 0); + } + + public SinkMetadata(int numRowsInSegment, int numHydrants) + { + this.numRowsInSegment = numRowsInSegment; + this.numHydrants = numHydrants; + } + + public void addRows(int num) + { + numRowsInSegment += num; + } + + public void addHydrants(int num) + { + numHydrants += num; + } + + public int getNumRowsInSegment() + { + return numRowsInSegment; + } + + public int getNumHydrants() + { + return numHydrants; + } + + } + + // This variable updated in add(), persist(), and drop() + private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger(); + private final AtomicInteger totalRows = new AtomicInteger(); + private final AtomicLong bytesCurrentlyInMemory = new AtomicLong(); + private final RowIngestionMeters rowIngestionMeters; + private final ParseExceptionHandler parseExceptionHandler; + + private final AtomicBoolean closed = new AtomicBoolean(false); + + private volatile ListeningExecutorService persistExecutor = null; + private volatile ListeningExecutorService pushExecutor = null; + // use intermediate executor so that deadlock conditions can be prevented + // where persist and push Executor try to put tasks in each other queues + // thus creating circular dependency + private volatile ListeningExecutorService intermediateTempExecutor = null; + private volatile FileLock basePersistDirLock = null; + private volatile FileChannel basePersistDirLockChannel = null; + + private volatile Throwable persistError; + + /** + * This constructor allows the caller to provide its own SinkQuerySegmentWalker. + * <p> + * The sinkTimeline is set to the sink timeline of the provided SinkQuerySegmentWalker. + * If the SinkQuerySegmentWalker is null, a new sink timeline is initialized. + * <p> + * It is used by UnifiedIndexerAppenderatorsManager which allows queries on data associated with multiple + * Appenderators. + */ + BatchAppenderator( + String id, + DataSchema schema, + AppenderatorConfig tuningConfig, + FireDepartmentMetrics metrics, + DataSegmentPusher dataSegmentPusher, + ObjectMapper objectMapper, + DataSegmentAnnouncer segmentAnnouncer, + @Nullable SinkQuerySegmentWalker sinkQuerySegmentWalker, + IndexIO indexIO, + IndexMerger indexMerger, + RowIngestionMeters rowIngestionMeters, + ParseExceptionHandler parseExceptionHandler + ) + { + Preconditions.checkArgument( + sinkQuerySegmentWalker == null, + "Batch appenderator does not use a versioned timeline" + ); + + this.myId = id; + this.schema = Preconditions.checkNotNull(schema, "schema"); + this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); + this.metrics = Preconditions.checkNotNull(metrics, "metrics"); + this.dataSegmentPusher = Preconditions.checkNotNull(dataSegmentPusher, "dataSegmentPusher"); + this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); + this.indexIO = Preconditions.checkNotNull(indexIO, "indexIO"); + this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger"); + this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters"); + this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler"); + + maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault(); + skipBytesInMemoryOverheadCheck = tuningConfig.isSkipBytesInMemoryOverheadCheck(); + } + + @Override + public String getId() + { + return myId; + } + + @Override + public String getDataSource() + { + return schema.getDataSource(); + } + + @Override + public Object startJob() + { + tuningConfig.getBasePersistDirectory().mkdirs(); + lockBasePersistDirectory(); + initializeExecutors(); + return null; + } + + private void throwPersistErrorIfExists() + { + if (persistError != null) { + throw new RE(persistError, "Error while persisting"); + } + } + + @Override + public AppenderatorAddResult add( + final SegmentIdWithShardSpec identifier, + final InputRow row, + @Nullable final Supplier<Committer> committerSupplier, + final boolean allowIncrementalPersists + ) throws IndexSizeExceededException, SegmentNotWritableException + { + + throwPersistErrorIfExists(); + + Preconditions.checkArgument( + committerSupplier == null, + "Batch appenderator does not need a committer!" + ); + + Preconditions.checkArgument( + allowIncrementalPersists, + "Batch appenderator should always allow incremental persists!" + ); + + if (!identifier.getDataSource().equals(schema.getDataSource())) { + throw new IAE( + "Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!", + schema.getDataSource(), + identifier.getDataSource() + ); + } + + final Sink sink = getOrCreateSink(identifier); + metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch()); + final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory(); + final int sinkRowsInMemoryAfterAdd; + final long bytesInMemoryBeforeAdd = sink.getBytesInMemory(); + final long bytesInMemoryAfterAdd; + final IncrementalIndexAddResult addResult; + + try { + addResult = sink.add(row, false); // allow incrememtal persis is always true for batch + sinkRowsInMemoryAfterAdd = addResult.getRowCount(); + bytesInMemoryAfterAdd = addResult.getBytesInMemory(); + } + catch (IndexSizeExceededException e) { + // Uh oh, we can't do anything about this! We can't persist (commit metadata would be out of sync) and we + // can't add the row (it just failed). This should never actually happen, though, because we check + // sink.canAddRow after returning from add. + log.error(e, "Sink for segment[%s] was unexpectedly full!", identifier); + throw e; + } + + if (sinkRowsInMemoryAfterAdd < 0) { + throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier); + } + + if (addResult.isRowAdded()) { + rowIngestionMeters.incrementProcessed(); + } else if (addResult.hasParseException()) { + parseExceptionHandler.handle(addResult.getParseException()); + } + + final int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd; + rowsCurrentlyInMemory.addAndGet(numAddedRows); + bytesCurrentlyInMemory.addAndGet(bytesInMemoryAfterAdd - bytesInMemoryBeforeAdd); + totalRows.addAndGet(numAddedRows); + sinksMetadata.computeIfAbsent(identifier, Void -> new SinkMetadata()).addRows(numAddedRows); + + boolean isPersistRequired = false; + boolean persist = false; + List<String> persistReasons = new ArrayList<>(); + + if (!sink.canAppendRow()) { + persist = true; + persistReasons.add("No more rows can be appended to sink"); + } + if (rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) { + persist = true; + persistReasons.add(StringUtils.format( + "rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d]", + rowsCurrentlyInMemory.get(), + tuningConfig.getMaxRowsInMemory() + )); + } + if (bytesCurrentlyInMemory.get() >= maxBytesTuningConfig) { + persist = true; + persistReasons.add(StringUtils.format( + "bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]", + bytesCurrentlyInMemory.get(), + maxBytesTuningConfig + )); + } + if (persist) { + // persistAll clears rowsCurrentlyInMemory, no need to update it. + log.info("Incremental persist to disk because %s.", String.join(",", persistReasons)); + + long bytesToBePersisted = 0L; + for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) { + final Sink sinkEntry = entry.getValue(); + if (sinkEntry != null) { + bytesToBePersisted += sinkEntry.getBytesInMemory(); + if (sinkEntry.swappable()) { + // Code for batch no longer memory maps hydrants but they still take memory... + int memoryStillInUse = calculateMemoryUsedByHydrants(sink.getCurrHydrant()); + bytesCurrentlyInMemory.addAndGet(memoryStillInUse); + } + } + } + + if (!skipBytesInMemoryOverheadCheck + && bytesCurrentlyInMemory.get() - bytesToBePersisted > maxBytesTuningConfig) { + // We are still over maxBytesTuningConfig even after persisting. + // This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion) + final String alertMessage = StringUtils.format( + "Task has exceeded safe estimated heap usage limits, failing " + + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])" + + "(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > maxBytesTuningConfig: [%d])", + sinks.size(), + sinks.values().stream().mapToInt(Iterables::size).sum(), + getTotalRowCount(), + bytesCurrentlyInMemory.get(), + bytesToBePersisted, + maxBytesTuningConfig + ); + final String errorMessage = StringUtils.format( + "%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to " + + "great to have enough space to process additional input rows. This check, along with metering the overhead " + + "of these objects to factor into the 'maxBytesInMemory' computation, can be disabled by setting " + + "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so might allow the task to naturally encounter " + + "a 'java.lang.OutOfMemoryError'). Alternatively, 'maxBytesInMemory' can be increased which will cause an " + + "increase in heap footprint, but will allow for more intermediary segment persists to occur before " + + "reaching this condition.", + alertMessage + ); + log.makeAlert(alertMessage) + .addData("dataSource", schema.getDataSource()) + .emit(); + throw new RuntimeException(errorMessage); + } + + persistAllAndClear(); + + } + return new AppenderatorAddResult(identifier, sinksMetadata.get(identifier).numRowsInSegment, isPersistRequired); + } + + @Override + public List<SegmentIdWithShardSpec> getSegments() + { + return ImmutableList.copyOf(sinks.keySet()); + } + + @Override + public int getRowCount(final SegmentIdWithShardSpec identifier) + { + return sinksMetadata.get(identifier).getNumRowsInSegment(); + } + + @Override + public int getTotalRowCount() + { + return totalRows.get(); + } + + @VisibleForTesting + public int getRowsInMemory() + { + return rowsCurrentlyInMemory.get(); + } + + @VisibleForTesting + public long getBytesCurrentlyInMemory() + { + return bytesCurrentlyInMemory.get(); + } + + @VisibleForTesting + public long getBytesInMemory(SegmentIdWithShardSpec identifier) + { + final Sink sink = sinks.get(identifier); + + if (sink == null) { + return 0L; // sinks are removed after a persist + } else { + return sink.getBytesInMemory(); + } + } + + private Sink getOrCreateSink(final SegmentIdWithShardSpec identifier) + { + Sink retVal = sinks.get(identifier); + + if (retVal == null) { + retVal = new Sink( + identifier.getInterval(), + schema, + identifier.getShardSpec(), + identifier.getVersion(), + tuningConfig.getAppendableIndexSpec(), + tuningConfig.getMaxRowsInMemory(), + maxBytesTuningConfig, + null + ); + bytesCurrentlyInMemory.addAndGet(calculateSinkMemoryInUsed(retVal)); + + sinks.put(identifier, retVal); + metrics.setSinkCount(sinks.size()); + } + + return retVal; + } + + @Override + public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals) + { + throw new UnsupportedOperationException("No query runner for batch appenderator"); + } + + @Override + public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs) + { + throw new UnsupportedOperationException("No query runner for batch appenderator"); + } + + @Override + public void clear() throws InterruptedException + { + clear(true); + } + + private void clear(boolean removeOnDiskData) throws InterruptedException + { + // Drop commit metadata, then abandon all segments. + log.info("Clearing all sinks & hydrants, removing data on disk: [%s]", removeOnDiskData); + try { + throwPersistErrorIfExists(); + // Drop everything. + final List<ListenableFuture<?>> futures = new ArrayList<>(); + for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) { + futures.add(removeSink(entry.getKey(), entry.getValue(), removeOnDiskData)); + } + // Await dropping. + Futures.allAsList(futures).get(); + } + catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public ListenableFuture<?> drop(final SegmentIdWithShardSpec identifier) + { + final Sink sink = sinks.get(identifier); + SinkMetadata sm = sinksMetadata.remove(identifier); + if (sm != null) { + int originalTotalRows = getTotalRowCount(); + int rowsToDrop = sm.getNumRowsInSegment(); + int totalRowsAfter = originalTotalRows - rowsToDrop; + if (totalRowsAfter < 0) { + log.warn("Total rows[%d] after dropping segment[%s] rows [%d]", totalRowsAfter, identifier, rowsToDrop); + } + totalRows.set(Math.max(totalRowsAfter, 0)); + } + if (sink != null) { + return removeSink(identifier, sink, true); + } else { + return Futures.immediateFuture(null); + } + } + + private SegmentsAndCommitMetadata persistAllAndClear() + { + final ListenableFuture<Object> toPersist = Futures.transform( + persistAll(null), + (Function<Object, Object>) future -> future + ); + + // make sure sinks are cleared before push is called + final SegmentsAndCommitMetadata commitMetadata; + try { + commitMetadata = (SegmentsAndCommitMetadata) toPersist.get(); + clear(false); + return commitMetadata; + } + catch (Throwable t) { + persistError = t; + } + return null; + } + + @Override + public ListenableFuture<Object> persistAll(@Nullable final Committer committer) + { + throwPersistErrorIfExists(); + final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = new ArrayList<>(); + int numPersistedRows = 0; + long bytesPersisted = 0L; + MutableInt totalHydrantsCount = new MutableInt(); + MutableInt totalHydrantsPersistedAcrossSinks = new MutableInt(); + SegmentIdWithShardSpec startIdentifier = null; + final long totalSinks = sinks.size(); + for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) { + final SegmentIdWithShardSpec identifier = entry.getKey(); + final Sink sink = entry.getValue(); + if (sink == null) { + throw new ISE("No sink for identifier: %s", identifier); + } + + final List<FireHydrant> hydrants = Lists.newArrayList(sink); + totalHydrantsCount.add(hydrants.size()); + numPersistedRows += sink.getNumRowsInMemory(); + bytesPersisted += sink.getBytesInMemory(); + + final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size(); + + // gather hydrants that have not been persisted: + for (FireHydrant hydrant : hydrants.subList(0, limit)) { + if (!hydrant.hasSwapped()) { + log.debug("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", hydrant, identifier); + indexesToPersist.add(Pair.of(hydrant, identifier)); + totalHydrantsPersistedAcrossSinks.add(1); + } + } + + if (sink.swappable()) { + // It is swappable. Get the old one to persist it and create a new one: + indexesToPersist.add(Pair.of(sink.swap(), identifier)); + totalHydrantsPersistedAcrossSinks.add(1); + } + + } + log.debug("Submitting persist runnable for dataSource[%s]", schema.getDataSource()); + + if (indexesToPersist.isEmpty()) { + log.info("No indexes will be peristed"); + } + final Stopwatch runExecStopwatch = Stopwatch.createStarted(); + final Stopwatch persistStopwatch = Stopwatch.createStarted(); + AtomicLong totalPersistedRows = new AtomicLong(numPersistedRows); + final ListenableFuture<Object> future = persistExecutor.submit( Review comment: removed executors -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
