clintropolis commented on code in PR #12918: URL: https://github.com/apache/druid/pull/12918#discussion_r949691565
########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java: ########## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.google.common.collect.Iterators; +import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.frame.allocation.MemoryAllocator; +import org.apache.druid.frame.channel.ReadableConcatFrameChannel; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.OutputChannel; +import org.apache.druid.frame.processor.OutputChannelFactory; +import org.apache.druid.frame.processor.OutputChannels; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.counters.CounterTracker; +import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.msq.input.InputSliceReader; +import org.apache.druid.msq.input.InputSlices; +import org.apache.druid.msq.input.ReadableInput; +import org.apache.druid.msq.input.ReadableInputs; +import org.apache.druid.msq.input.external.ExternalInputSlice; +import org.apache.druid.msq.input.stage.StageInputSlice; +import org.apache.druid.msq.kernel.FrameContext; +import org.apache.druid.msq.kernel.ProcessorsAndChannels; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +public abstract class BaseLeafFrameProcessorFactory extends BaseFrameProcessorFactory +{ + private static final Logger log = new Logger(BaseLeafFrameProcessorFactory.class); + + @Override + public ProcessorsAndChannels<FrameProcessor<Long>, Long> makeProcessors( + StageDefinition stageDefinition, + int workerNumber, + List<InputSlice> inputSlices, + InputSliceReader inputSliceReader, + @Nullable Object extra, + OutputChannelFactory outputChannelFactory, + FrameContext frameContext, + int maxOutstandingProcessors, + CounterTracker counters, + Consumer<Throwable> warningPublisher + ) throws IOException + { + // BaseLeafFrameProcessorFactory is used for native Druid queries, where the following input cases can happen: + // 1) Union datasources: N nonbroadcast inputs, which are are treated as one big input + // 2) Join datasources: one nonbroadcast input, N broadcast inputs + // 3) All other datasources: single input + + final int totalProcessors = InputSlices.getNumNonBroadcastReadableInputs( + inputSlices, + inputSliceReader, + stageDefinition.getBroadcastInputNumbers() + ); + + if (totalProcessors == 0) { + return new ProcessorsAndChannels<>(Sequences.empty(), OutputChannels.none()); + } + + final int outstandingProcessors; + + if (hasParquet(inputSlices)) { Review Comment: this one stuck out a bit while i was scanning... any thoughts on a better way to do this in the future? maybe some mechanism for an input format to indicate how much data its going to read or something? ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.inject.Injector; +import com.google.inject.Key; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.processor.Bouncer; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexing.common.SegmentCacheManagerFactory; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.exec.ControllerClient; +import org.apache.druid.msq.exec.TaskDataSegmentProvider; +import org.apache.druid.msq.exec.Worker; +import org.apache.druid.msq.exec.WorkerClient; +import org.apache.druid.msq.exec.WorkerContext; +import org.apache.druid.msq.exec.WorkerMemoryParameters; +import org.apache.druid.msq.input.InputSpecs; +import org.apache.druid.msq.kernel.FrameContext; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.rpc.CoordinatorServiceClient; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocations; +import org.apache.druid.rpc.ServiceLocator; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy; +import org.apache.druid.rpc.indexing.SpecificTaskServiceLocator; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.loading.SegmentCacheManager; +import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager; +import org.apache.druid.server.DruidNode; + +import java.io.File; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadLocalRandom; + +public class IndexerWorkerContext implements WorkerContext +{ + private static final Logger log = new Logger(IndexerWorkerContext.class); + private static final long FREQUENCY_CHECK_MILLIS = 1000; + private static final long FREQUENCY_CHECK_JITTER = 30; + + private final TaskToolbox toolbox; + private final Injector injector; + private final IndexIO indexIO; + private final TaskDataSegmentProvider dataSegmentProvider; + private final ServiceClientFactory clientFactory; + + @GuardedBy("this") + private OverlordClient overlordClient; + + @GuardedBy("this") + private ServiceLocator controllerLocator; + + public IndexerWorkerContext( + final TaskToolbox toolbox, + final Injector injector, + final IndexIO indexIO, + final TaskDataSegmentProvider dataSegmentProvider, + final ServiceClientFactory clientFactory + ) + { + this.toolbox = toolbox; + this.injector = injector; + this.indexIO = indexIO; + this.dataSegmentProvider = dataSegmentProvider; + this.clientFactory = clientFactory; + } + + public static IndexerWorkerContext createProductionInstance(final TaskToolbox toolbox, final Injector injector) + { + final IndexIO indexIO = injector.getInstance(IndexIO.class); + final CoordinatorServiceClient coordinatorServiceClient = + injector.getInstance(CoordinatorServiceClient.class).withRetryPolicy(StandardRetryPolicy.unlimited()); + final SegmentCacheManager segmentCacheManager = + injector.getInstance(SegmentCacheManagerFactory.class) + .manufacturate(new File(toolbox.getIndexingTmpDir(), "segment-fetch")); + final ServiceClientFactory serviceClientFactory = + injector.getInstance(Key.get(ServiceClientFactory.class, EscalatedGlobal.class)); + + return new IndexerWorkerContext( + toolbox, + injector, + indexIO, + new TaskDataSegmentProvider(coordinatorServiceClient, segmentCacheManager, indexIO), + serviceClientFactory + ); + } + + public TaskToolbox toolbox() + { + return toolbox; + } + + @Override + public ObjectMapper jsonMapper() + { + return toolbox.getJsonMapper(); + } + + @Override + public Injector injector() + { + return injector; + } + + @Override + public void registerWorker(Worker worker, Closer closer) + { + WorkerChatHandler chatHandler = new WorkerChatHandler(toolbox, worker); + toolbox.getChatHandlerProvider().register(worker.id(), chatHandler, false); + closer.register(() -> toolbox.getChatHandlerProvider().unregister(worker.id())); + closer.register(() -> { + synchronized (this) { + if (controllerLocator != null) { + controllerLocator.close(); + } + } + }); + + // Register the periodic controller checker + final ExecutorService periodicControllerCheckerExec = Execs.singleThreaded("controller-status-checker-%s"); + closer.register(periodicControllerCheckerExec::shutdownNow); + final ServiceLocator controllerLocator = makeControllerLocator(worker.task().getControllerTaskId()); + periodicControllerCheckerExec.submit(() -> controllerCheckerRunnable(controllerLocator, worker)); + } + + @VisibleForTesting + void controllerCheckerRunnable(final ServiceLocator controllerLocator, final Worker worker) + { + while (true) { + // Add some randomness to the frequency of the loop to avoid requests from simultaneously spun up tasks bunching + // up and stagger them randomly + long sleepTimeMillis = FREQUENCY_CHECK_MILLIS + ThreadLocalRandom.current().nextLong( + -FREQUENCY_CHECK_JITTER, + 2 * FREQUENCY_CHECK_JITTER + ); + final ServiceLocations controllerLocations; + try { + controllerLocations = controllerLocator.locate().get(); + } + catch (Throwable e) { + // Service locator exceptions are not recoverable. + log.noStackTrace().warn( + e, + "Periodic fetch of controller location encountered an exception. Worker task [%s] will exit.", + worker.id() + ); + worker.controllerFailed(); + break; + } + + if (controllerLocations.isClosed() || controllerLocations.getLocations().isEmpty()) { + log.warn( + "Periodic fetch of controller location returned [%s]. Worker task [%s] will exit.", + controllerLocations, + worker.id() + ); + worker.controllerFailed(); + break; + } + + try { + Thread.sleep(sleepTimeMillis); + } + catch (InterruptedException ignored) { + // Do nothing: an interrupt means we were shut down. Status checker should exit quietly. + } + } + } + + @Override + public File tempDir() + { + return toolbox.getIndexingTmpDir(); + } + + @Override + public ControllerClient makeControllerClient(String controllerId) + { + final ServiceLocator locator = makeControllerLocator(controllerId); + + return new IndexerControllerClient( + clientFactory.makeClient( + controllerId, + locator, + new SpecificTaskRetryPolicy(controllerId, StandardRetryPolicy.unlimited()) + ), + jsonMapper(), + locator + ); + } + + @Override + public WorkerClient makeWorkerClient() + { + // Ignore workerId parameter. The workerId is passed into each method of WorkerClient individually. + return new IndexerWorkerClient(clientFactory, makeOverlordClient(), jsonMapper()); + } + + @Override + public FrameContext frameContext(QueryDefinition queryDef, int stageNumber) + { + final int numWorkersInJvm; + + // Determine the max number of workers in JVM for memory allocations. + if (toolbox.getAppenderatorsManager() instanceof UnifiedIndexerAppenderatorsManager) { + // CliIndexer + numWorkersInJvm = injector.getInstance(WorkerConfig.class).getCapacity(); Review Comment: i wonder if longer term this makes sense to be its own config? sort of seems to be giving a double meaning, though close enough in purpose that is fine -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
