loquisgon commented on a change in pull request #11294: URL: https://github.com/apache/druid/pull/11294#discussion_r666623991
########## File path: server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java ########## @@ -0,0 +1,774 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JSONParseSpec; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexMerger; +import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; +import org.apache.druid.segment.incremental.ParseExceptionHandler; +import org.apache.druid.segment.incremental.RowIngestionMeters; +import org.apache.druid.segment.incremental.SimpleRowIngestionMeters; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.TuningConfig; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.realtime.FireDepartmentMetrics; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; + +public class BatchAppenderatorTester implements AutoCloseable +{ + public static final String DATASOURCE = "foo"; + + private final DataSchema schema; + private final AppenderatorConfig tuningConfig; + private final FireDepartmentMetrics metrics; + private final ObjectMapper objectMapper; + private final Appenderator appenderator; + private final ServiceEmitter emitter; + + private final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>(); + + public BatchAppenderatorTester( + final int maxRowsInMemory + ) + { + this(maxRowsInMemory, -1, null, false); + } + + public BatchAppenderatorTester( + final int maxRowsInMemory, + final boolean enablePushFailure + ) + { + this(maxRowsInMemory, -1, null, enablePushFailure); + } + + public BatchAppenderatorTester( + final int maxRowsInMemory, + final long maxSizeInBytes, + final boolean enablePushFailure + ) + { + this(maxRowsInMemory, maxSizeInBytes, null, enablePushFailure); + } + + public BatchAppenderatorTester( + final int maxRowsInMemory, + final long maxSizeInBytes, + final File basePersistDirectory, + final boolean enablePushFailure + ) + { + this( + maxRowsInMemory, + maxSizeInBytes, + basePersistDirectory, + enablePushFailure, + new SimpleRowIngestionMeters(), + false, + false + ); + } + + public BatchAppenderatorTester( + final int maxRowsInMemory, + final long maxSizeInBytes, + @Nullable final File basePersistDirectory, + final boolean enablePushFailure, + final RowIngestionMeters rowIngestionMeters + ) + { + this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, rowIngestionMeters, + false, false + ); + } + + public BatchAppenderatorTester( + final int maxRowsInMemory, + final long maxSizeInBytes, + @Nullable final File basePersistDirectory, + final boolean enablePushFailure, + final RowIngestionMeters rowIngestionMeters, + final boolean skipBytesInMemoryOverheadCheck, + final boolean useLegacyBatchProcessing + ) + { + objectMapper = new DefaultObjectMapper(); + objectMapper.registerSubtypes(LinearShardSpec.class); + + final Map<String, Object> parserMap = objectMapper.convertValue( + new MapInputRowParser( + new JSONParseSpec( + new TimestampSpec("ts", "auto", null), + new DimensionsSpec(null, null, null), + null, + null, + null + ) + ), + Map.class + ); + schema = new DataSchema( + DATASOURCE, + parserMap, + new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new LongSumAggregatorFactory("met", "met") + }, + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null), + null, + objectMapper + ); + tuningConfig = new TestIndexTuningConfig( + null, + 2, + null, + maxRowsInMemory, + maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes, + skipBytesInMemoryOverheadCheck, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + true, + null, + null, + null, + null, + basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory + ); + metrics = new FireDepartmentMetrics(); + + IndexIO indexIO = new IndexIO( + objectMapper, + () -> 0 + ); + IndexMerger indexMerger = new IndexMergerV9( + objectMapper, + indexIO, + OffHeapMemorySegmentWriteOutMediumFactory.instance() + ); + + emitter = new ServiceEmitter( + "test", + "test", + new NoopEmitter() + ); + emitter.start(); + EmittingLogger.registerEmitter(emitter); + DataSegmentPusher dataSegmentPusher = new DataSegmentPusher() + { + private boolean mustFail = true; + + @Deprecated + @Override + public String getPathForHadoop(String dataSource) + { + return getPathForHadoop(); + } + + @Override + public String getPathForHadoop() + { + throw new UnsupportedOperationException(); + } + + @Override + public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException + { + if (enablePushFailure && mustFail) { + mustFail = false; + throw new IOException("Push failure test"); + } else if (enablePushFailure) { + mustFail = true; + } + pushedSegments.add(segment); + return segment; + } + + @Override + public Map<String, Object> makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } + }; + appenderator = Appenderators.createOffline( + schema.getDataSource(), + schema, + tuningConfig, + metrics, + dataSegmentPusher, + objectMapper, + indexIO, + indexMerger, + rowIngestionMeters, + new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0), + useLegacyBatchProcessing + ); + } + + private long getDefaultMaxBytesInMemory() + { + return (Runtime.getRuntime().totalMemory()) / 3; + } + + public DataSchema getSchema() + { + return schema; + } + + public AppenderatorConfig getTuningConfig() + { + return tuningConfig; + } + + public FireDepartmentMetrics getMetrics() + { + return metrics; + } + + public ObjectMapper getObjectMapper() + { + return objectMapper; + } + + public Appenderator getAppenderator() + { + return appenderator; + } + + public List<DataSegment> getPushedSegments() + { + return pushedSegments; + } + + @Override + public void close() throws Exception + { + appenderator.close(); + emitter.close(); + FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory()); + } + + private static File createNewBasePersistDirectory() + { + return FileUtils.createTempDir("druid-batch-persist"); + } + + + // copied from druid-indexing testing since it is not accessible from server module, + // Cleaned up a little but Leaving mostly as-is since most of the functionality is + // setting defaults when passing null and those defaults are + // required for the appenderator to work + private static class TestIndexTuningConfig implements AppenderatorConfig + { + private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); + private static final int DEFAULT_MAX_PENDING_PERSISTS = 0; + private static final boolean DEFAULT_GUARANTEE_ROLLUP = false; + private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; + private static final long DEFAULT_PUSH_TIMEOUT = 0; + + private final AppendableIndexSpec appendableIndexSpec; + private final int maxRowsInMemory; + private final long maxBytesInMemory; + private final boolean skipBytesInMemoryOverheadCheck; + private final int maxColumnsToMerge; + + // null if all partitionsSpec related params are null. see getDefaultPartitionsSpec() for details. + @Nullable + private final PartitionsSpec partitionsSpec; + private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; + private final File basePersistDirectory; + private final int maxPendingPersists; + + private final boolean forceGuaranteedRollup; + private final boolean reportParseExceptions; + private final long pushTimeout; + private final boolean logParseExceptions; + private final int maxParseExceptions; + private final int maxSavedParseExceptions; + private final long awaitSegmentAvailabilityTimeoutMillis; + + @Nullable + private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + + @Nullable + private static PartitionsSpec getPartitionsSpec( + boolean forceGuaranteedRollup, + @Nullable PartitionsSpec partitionsSpec, + @Nullable Integer maxRowsPerSegment, + @Nullable Long maxTotalRows, + @Nullable Integer numShards, + @Nullable List<String> partitionDimensions + ) + { + if (partitionsSpec == null) { + if (forceGuaranteedRollup) { + if (maxRowsPerSegment != null + || numShards != null + || (partitionDimensions != null && !partitionDimensions.isEmpty())) { + return new HashedPartitionsSpec(maxRowsPerSegment, numShards, partitionDimensions); + } else { + return null; + } + } else { + if (maxRowsPerSegment != null || maxTotalRows != null) { + return new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows); + } else { + return null; + } + } + } else { + if (forceGuaranteedRollup) { + if (!partitionsSpec.isForceGuaranteedRollupCompatibleType()) { + throw new IAE(partitionsSpec.getClass().getSimpleName() + " cannot be used for perfect rollup"); + } + } else { + if (!(partitionsSpec instanceof DynamicPartitionsSpec)) { + throw new IAE("DynamicPartitionsSpec must be used for best-effort rollup"); + } + } + return partitionsSpec; + } + } + + public TestIndexTuningConfig( + Integer targetPartitionSize, + Integer maxRowsPerSegment, + AppendableIndexSpec appendableIndexSpec, + Integer maxRowsInMemory, + Long maxBytesInMemory, + Boolean skipBytesInMemoryOverheadCheck, + Long maxTotalRows, + Integer rowFlushBoundary_forBackCompatibility, + Integer numShards, + List<String> partitionDimensions, + PartitionsSpec partitionsSpec, + IndexSpec indexSpec, + IndexSpec indexSpecForIntermediatePersists, + Integer maxPendingPersists, + Boolean forceGuaranteedRollup, + Boolean reportParseExceptions, + Long publishTimeout, + Long pushTimeout, + SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + Boolean logParseExceptions, + Integer maxParseExceptions, + Integer maxSavedParseExceptions, + Integer maxColumnsToMerge, + Long awaitSegmentAvailabilityTimeoutMillis, + File basePersistDir + ) + { + this( + appendableIndexSpec, + maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility, + maxBytesInMemory != null ? maxBytesInMemory : 0, + skipBytesInMemoryOverheadCheck != null + ? skipBytesInMemoryOverheadCheck + : DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK, + getPartitionsSpec( + forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup, + partitionsSpec, + maxRowsPerSegment == null ? targetPartitionSize : maxRowsPerSegment, + maxTotalRows, + numShards, + partitionDimensions + ), + indexSpec, + indexSpecForIntermediatePersists, + maxPendingPersists, + forceGuaranteedRollup, + reportParseExceptions, + pushTimeout != null ? pushTimeout : publishTimeout, + basePersistDir, + segmentWriteOutMediumFactory, + logParseExceptions, + maxParseExceptions, + maxSavedParseExceptions, + maxColumnsToMerge, + awaitSegmentAvailabilityTimeoutMillis + ); + + Preconditions.checkArgument( + targetPartitionSize == null || maxRowsPerSegment == null, + "Can't use targetPartitionSize and maxRowsPerSegment together" + ); + } + + private TestIndexTuningConfig( + AppendableIndexSpec appendableIndexSpec, + Integer maxRowsInMemory, + Long maxBytesInMemory, + Boolean skipBytesInMemoryOverheadCheck, + @Nullable PartitionsSpec partitionsSpec, + IndexSpec indexSpec, + IndexSpec indexSpecForIntermediatePersists, + Integer maxPendingPersists, + Boolean forceGuaranteedRollup, + Boolean reportParseExceptions, + Long pushTimeout, + File basePersistDirectory, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + Boolean logParseExceptions, + Integer maxParseExceptions, + Integer maxSavedParseExceptions, + Integer maxColumnsToMerge, + Long awaitSegmentAvailabilityTimeoutMillis + ) + { + this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; + this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; + // initializing this to 0, it will be lazily initialized to a value + // @see #getMaxBytesInMemoryOrDefault() + this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; + this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck == null + ? + DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK + : skipBytesInMemoryOverheadCheck; + this.maxColumnsToMerge = maxColumnsToMerge == null + ? IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE + : maxColumnsToMerge; + this.partitionsSpec = partitionsSpec; + this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; + this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; + this.forceGuaranteedRollup = forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup; + this.reportParseExceptions = reportParseExceptions == null + ? DEFAULT_REPORT_PARSE_EXCEPTIONS + : reportParseExceptions; + this.pushTimeout = pushTimeout == null ? DEFAULT_PUSH_TIMEOUT : pushTimeout; + this.basePersistDirectory = basePersistDirectory; + + this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; + + if (this.reportParseExceptions) { + this.maxParseExceptions = 0; + this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions); + } else { + this.maxParseExceptions = maxParseExceptions == null + ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS + : maxParseExceptions; + this.maxSavedParseExceptions = maxSavedParseExceptions == null + ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS + : maxSavedParseExceptions; + } + this.logParseExceptions = logParseExceptions == null + ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS + : logParseExceptions; + if (awaitSegmentAvailabilityTimeoutMillis == null || awaitSegmentAvailabilityTimeoutMillis < 0) { + this.awaitSegmentAvailabilityTimeoutMillis = DEFAULT_AWAIT_SEGMENT_AVAILABILITY_TIMEOUT_MILLIS; + } else { + this.awaitSegmentAvailabilityTimeoutMillis = awaitSegmentAvailabilityTimeoutMillis; + } + } + + @Override + public TestIndexTuningConfig withBasePersistDirectory(File dir) + { + throw new UnsupportedOperationException(); + } + + @Override + public AppendableIndexSpec getAppendableIndexSpec() + { + return appendableIndexSpec; + } + + @Override + public int getMaxRowsInMemory() + { + return maxRowsInMemory; + } + + @Override + public long getMaxBytesInMemory() + { + return maxBytesInMemory; + } + + @Override + public boolean isSkipBytesInMemoryOverheadCheck() + { + return skipBytesInMemoryOverheadCheck; + } + + @Nullable + @Override + public PartitionsSpec getPartitionsSpec() + { + return partitionsSpec; + } + + public PartitionsSpec getGivenOrDefaultPartitionsSpec() + { + if (partitionsSpec != null) { + return partitionsSpec; + } + return forceGuaranteedRollup + ? new HashedPartitionsSpec(null, null, null) + : new DynamicPartitionsSpec(null, null); + } + + @Override + public IndexSpec getIndexSpec() + { + return indexSpec; + } + + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + + @Override + public int getMaxPendingPersists() + { + return maxPendingPersists; + } + + public boolean isForceGuaranteedRollup() + { + return forceGuaranteedRollup; + } + + @Override + public boolean isReportParseExceptions() + { + return reportParseExceptions; + } + + @Nullable + @Override + public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() + { + return segmentWriteOutMediumFactory; + } + + @Override + public int getMaxColumnsToMerge() + { + return maxColumnsToMerge; + } + + public boolean isLogParseExceptions() + { + return logParseExceptions; + } + + public int getMaxParseExceptions() + { + return maxParseExceptions; + } + + public int getMaxSavedParseExceptions() + { + return maxSavedParseExceptions; + } + + /** + * Return the max number of rows per segment. This returns null if it's not specified in tuningConfig. + * Deprecated in favor of {@link #getGivenOrDefaultPartitionsSpec()}. + */ + @Nullable + @Override + @Deprecated Review comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
