cryptoe commented on code in PR #15817:
URL: https://github.com/apache/druid/pull/15817#discussion_r1566725422
##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -142,20 +333,36 @@ public ServerView.CallbackAction
segmentSchemasAnnounced(SegmentSchemas segmentS
@Override
public void refresh(final Set<SegmentId> segmentsToRefresh, final
Set<String> dataSourcesToRebuild) throws IOException
{
+ log.debug("Segments to refresh [%s], dataSourcesToRebuild [%s]",
segmentsToRefresh, dataSourcesToRebuild);
+ final Set<SegmentId> segmentsToRefreshMinusRealtimeSegments =
filterMutableSegments(segmentsToRefresh);
+ log.debug("SegmentsToRefreshMinusRealtimeSegments [%s]",
segmentsToRefreshMinusRealtimeSegments);
+ final Set<SegmentId> segmentsToRefreshMinusCachedSegments =
filterSegmentWithCachedSchema(segmentsToRefreshMinusRealtimeSegments);
Review Comment:
```suggestion
final Set<SegmentId> segmentsToRefreshWithoutCachedAndRealtimeSegments =
filterSegmentWithCachedSchema(segmentsToRefreshMinusRealtimeSegments);
```
##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.SegmentId;
+
+import javax.annotation.Nullable;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * In-memory cache of segment schema.
+ * <p>
+ * Internally, mapping of segmentId to segment level information like schemaId
& numRows is maintained.
+ * This mapping is updated on each database poll {@code finalizedSegmentStats}.
+ * Segment schema created since last DB poll is also fetched and updated in
the cache {@code finalizedSegmentSchema}.
+ * <p>
+ * Additionally, this class caches schema for realtime segments in {@code
realtimeSegmentSchemaMap}. This mapping
+ * is cleared either when the segment is removed or marked as finalized.
+ * <p>
+ * Finalized segments which do not have their schema information present in
the DB, fetch their schema via SMQ.
+ * SMQ results are cached in {@code inTransitSMQResults}. Once the schema
information is backfilled
+ * in the DB, it is removed from {@code inTransitSMQResults} and added to
{@code inTransitSMQPublishedResults}.
+ * {@code inTransitSMQPublishedResults} is cleared on each successfull DB poll.
Review Comment:
Can you please explain why these queues are required in the class java doc.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskSegmentSchemaUtil.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TaskSegmentSchemaUtil
+{
+ public static SchemaPayloadPlus getSegmentSchema(File segmentFile, IndexIO
indexIO) throws IOException
Review Comment:
Can you please add some javadocs. This method is really core.
##########
processing/src/main/java/org/apache/druid/segment/DataSegmentWithSchemas.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Encapsulates segment metadata and corresponding schema.
+ */
+public class DataSegmentWithSchemas
+{
+ private final Set<DataSegment> segments;
+ private final MinimalSegmentSchemas minimalSegmentSchemas;
+
+ public DataSegmentWithSchemas(String schemaVersion)
+ {
+ this.segments = new HashSet<>();
+ this.minimalSegmentSchemas = new MinimalSegmentSchemas(schemaVersion);
+ }
+
+ @JsonCreator
+ public DataSegmentWithSchemas(
+ @JsonProperty("segments") Set<DataSegment> segments,
+ @JsonProperty("minimalSegmentSchemas") MinimalSegmentSchemas
minimalSegmentSchemas
+ )
+ {
+ this.segments = segments;
+ this.minimalSegmentSchemas = minimalSegmentSchemas;
+ }
+
+ @JsonProperty
+ public Set<DataSegment> getSegments()
Review Comment:
Rename this to getSegmentsSnapshot().
You might have to adjust the jsonProperty()
##########
processing/src/main/java/org/apache/druid/segment/DataSegmentWithSchemas.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Encapsulates segment metadata and corresponding schema.
+ */
+public class DataSegmentWithSchemas
+{
+ private final Set<DataSegment> segments;
+ private final MinimalSegmentSchemas minimalSegmentSchemas;
+
+ public DataSegmentWithSchemas(String schemaVersion)
+ {
+ this.segments = new HashSet<>();
+ this.minimalSegmentSchemas = new MinimalSegmentSchemas(schemaVersion);
+ }
+
+ @JsonCreator
+ public DataSegmentWithSchemas(
+ @JsonProperty("segments") Set<DataSegment> segments,
+ @JsonProperty("minimalSegmentSchemas") MinimalSegmentSchemas
minimalSegmentSchemas
+ )
+ {
+ this.segments = segments;
+ this.minimalSegmentSchemas = minimalSegmentSchemas;
+ }
+
+ @JsonProperty
+ public Set<DataSegment> getSegments()
+ {
+ return segments;
+ }
+
+ @JsonProperty
+ public MinimalSegmentSchemas getMinimalSegmentSchemas()
+ {
+ return minimalSegmentSchemas;
+ }
+
+ public DataSegmentWithSchemas merge(DataSegmentWithSchemas other)
Review Comment:
This should return void.
##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.column.RowSignature;
+import
org.apache.druid.segment.metadata.SegmentSchemaManager.SegmentSchemaMetadataPlus;
+import org.apache.druid.timeline.SegmentId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class publishes the segment schema for segments obtained via segment
metadata query.
+ * It maintains a queue which is populated by {@link
CoordinatorSegmentMetadataCache}.
+ */
+@ManageLifecycle
+public class SegmentSchemaBackFillQueue
+{
+ private static final EmittingLogger log = new
EmittingLogger(SegmentSchemaBackFillQueue.class);
+ private static final int MAX_BATCH_SIZE = 500;
+ private final BlockingDeque<SegmentSchemaMetadataPlus> queue = new
LinkedBlockingDeque<>();
+ private final long executionPeriod;
+
+ private final SegmentSchemaManager segmentSchemaManager;
+ private final SegmentSchemaCache segmentSchemaCache;
+ private final ServiceEmitter emitter;
+ private final CentralizedDatasourceSchemaConfig config;
+ private ScheduledExecutorService executor;
+ private @Nullable ScheduledFuture<?> scheduledFuture = null;
+
+ @Inject
+ public SegmentSchemaBackFillQueue(
+ SegmentSchemaManager segmentSchemaManager,
+ ScheduledExecutorFactory scheduledExecutorFactory,
+ SegmentSchemaCache segmentSchemaCache,
+ ServiceEmitter emitter,
+ CentralizedDatasourceSchemaConfig config
+ )
+ {
+ this.segmentSchemaManager = segmentSchemaManager;
+ this.segmentSchemaCache = segmentSchemaCache;
+ this.emitter = emitter;
+ this.config = config;
+ this.executionPeriod = config.getBackFillPeriod();
+ if (isEnabled()) {
+ this.executor = scheduledExecutorFactory.create(1,
"SegmentSchemaBackFillQueue-%s");
+ }
+ }
+
+ @LifecycleStop
+ public void stop()
+ {
+ this.executor.shutdownNow();
+ scheduledFuture = null;
+ }
+
+ public void leaderStart()
+ {
+ if (isEnabled()) {
+ scheduledFuture = executor.scheduleAtFixedRate(this::processBatchesDue,
executionPeriod, executionPeriod, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void leaderStop()
+ {
+ if (isEnabled()) {
+ scheduledFuture.cancel(true);
+ }
+ }
+
+ public void add(
+ SegmentId segmentId,
+ RowSignature rowSignature,
+ long numRows,
+ Map<String, AggregatorFactory> aggregators
+ )
+ {
+ SchemaPayload schemaPayload = new SchemaPayload(rowSignature, aggregators);
+ SchemaPayloadPlus schemaMetadata = new SchemaPayloadPlus(schemaPayload,
numRows);
+ queue.add(
+ new SegmentSchemaMetadataPlus(
+ segmentId,
+
segmentSchemaManager.generateSchemaPayloadFingerprint(schemaMetadata.getSchemaPayload()),
+ schemaMetadata
+ )
+ );
+ }
+
+ public boolean isEnabled()
+ {
+ return config.isEnabled() && config.isBackFillEnabled();
+ }
+
+ public void processBatchesDue()
+ {
+ if (queue.isEmpty()) {
Review Comment:
Also put a stop watch here.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1947,13 +1992,46 @@ public int deletePendingSegments(String dataSource)
private Set<DataSegment> announceHistoricalSegmentBatch(
final Handle handle,
final Set<DataSegment> segments,
- final Set<DataSegment> usedSegments
+ final Set<DataSegment> usedSegments,
+ @Nullable final MinimalSegmentSchemas minimalSegmentSchemas
) throws IOException
{
final Set<DataSegment> toInsertSegments = new HashSet<>();
+ Map<String, Long> fingerprintSchemaIdMap = null;
+ boolean schemaPresent = false;
try {
+ if (!publishSchema()) {
+ log.info("Task schema publish is disabled.");
+ }
+ if (publishSchema()
+ && minimalSegmentSchemas != null
Review Comment:
Please refactor this to a method like
canPublishThisSchema(schema) since its used in 2 places.
##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -142,20 +333,36 @@ public ServerView.CallbackAction
segmentSchemasAnnounced(SegmentSchemas segmentS
@Override
public void refresh(final Set<SegmentId> segmentsToRefresh, final
Set<String> dataSourcesToRebuild) throws IOException
{
+ log.debug("Segments to refresh [%s], dataSourcesToRebuild [%s]",
segmentsToRefresh, dataSourcesToRebuild);
+ final Set<SegmentId> segmentsToRefreshMinusRealtimeSegments =
filterMutableSegments(segmentsToRefresh);
+ log.debug("SegmentsToRefreshMinusRealtimeSegments [%s]",
segmentsToRefreshMinusRealtimeSegments);
+ final Set<SegmentId> segmentsToRefreshMinusCachedSegments =
filterSegmentWithCachedSchema(segmentsToRefreshMinusRealtimeSegments);
+ final Set<SegmentId> cachedSegments =
Sets.difference(segmentsToRefreshMinusRealtimeSegments,
segmentsToRefreshMinusCachedSegments);
+ log.debug("SegmentsToRefreshMinusCachedSegments [%s], cachedSegments
[%s]", segmentsToRefreshMinusRealtimeSegments, cachedSegments);
+
// Refresh the segments.
- final Set<SegmentId> refreshed =
refreshSegments(filterMutableSegments(segmentsToRefresh));
+ Set<SegmentId> refreshed = Collections.emptySet();
+
+ if (!config.isDisableSegmentMetadataQueries()) {
+ refreshed = refreshSegments(segmentsToRefreshMinusCachedSegments);
+ }
+
+ log.info("Refreshed segments are [%s]", refreshed);
Review Comment:
Push the log in the If only.
##########
server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java:
##########
@@ -737,45 +728,11 @@ private Set<SegmentId> refreshSegmentsForDataSource(final
String dataSource, fin
log.warn("Got analysis for segment [%s] we didn't ask for,
ignoring.", analysis.getId());
} else {
final RowSignature rowSignature = analysisToRowSignature(analysis);
- log.debug("Segment[%s] has signature[%s].", segmentId, rowSignature);
- segmentMetadataInfo.compute(
- dataSource,
- (datasourceKey, dataSourceSegments) -> {
- if (dataSourceSegments == null) {
- // Datasource may have been removed or become unavailable
while this refresh was ongoing.
- log.warn(
- "No segment map found with datasource [%s], skipping
refresh of segment [%s]",
- datasourceKey,
- segmentId
- );
- return null;
- } else {
- dataSourceSegments.compute(
- segmentId,
- (segmentIdKey, segmentMetadata) -> {
- if (segmentMetadata == null) {
- log.warn("No segment [%s] found, skipping refresh",
segmentId);
- return null;
- } else {
- final AvailableSegmentMetadata
updatedSegmentMetadata = AvailableSegmentMetadata
- .from(segmentMetadata)
- .withRowSignature(rowSignature)
- .withNumRows(analysis.getNumRows())
- .build();
- retVal.add(segmentId);
- return updatedSegmentMetadata;
- }
- }
- );
+ log.info("Segment[%s] has signature[%s].", segmentId, rowSignature);
- if (dataSourceSegments.isEmpty()) {
- return null;
- } else {
- return dataSourceSegments;
- }
- }
- }
- );
+ if (smqAction(dataSource, segmentId, rowSignature, analysis)) {
+ retVal.add(segmentId);
Review Comment:
Lets come up with a better name than retVal here
refreshedSegmentsWithSchemas?
##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.SegmentId;
+
+import javax.annotation.Nullable;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * In-memory cache of segment schema.
+ * <p>
+ * Internally, mapping of segmentId to segment level information like schemaId
& numRows is maintained.
+ * This mapping is updated on each database poll {@code finalizedSegmentStats}.
+ * Segment schema created since last DB poll is also fetched and updated in
the cache {@code finalizedSegmentSchema}.
+ * <p>
+ * Additionally, this class caches schema for realtime segments in {@code
realtimeSegmentSchemaMap}. This mapping
+ * is cleared either when the segment is removed or marked as finalized.
+ * <p>
+ * Finalized segments which do not have their schema information present in
the DB, fetch their schema via SMQ.
+ * SMQ results are cached in {@code inTransitSMQResults}. Once the schema
information is backfilled
+ * in the DB, it is removed from {@code inTransitSMQResults} and added to
{@code inTransitSMQPublishedResults}.
+ * {@code inTransitSMQPublishedResults} is cleared on each successfull DB poll.
+ */
+@LazySingleton
+public class SegmentSchemaCache
+{
+ private static final EmittingLogger log = new
EmittingLogger(SegmentSchemaCache.class);
+
+ // Cache is marked initialized after first DB poll.
+ private final AtomicReference<CountDownLatch> initialized = new
AtomicReference<>(new CountDownLatch(1));
+
+ /**
+ * Mapping from segmentId to segment level information which includes
numRows and schemaId.
+ * This mapping is updated on each database poll.
+ */
+ private volatile ConcurrentMap<SegmentId, SegmentStats>
finalizedSegmentStats = new ConcurrentHashMap<>();
+
+ /**
+ * Mapping from schemaId to payload. Gets updated after DB poll.
+ */
+ private volatile ConcurrentMap<Long, SchemaPayload> finalizedSegmentSchema =
new ConcurrentHashMap<>();
+
+ /**
+ * Schema information for realtime segment. This mapping is updated when
schema for realtime segment is received.
+ * The mapping is removed when the segment is either removed or marked as
finalized.
+ */
+ private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
realtimeSegmentSchemaMap = new ConcurrentHashMap<>();
+
+ /**
+ * If the segment schema is fetched via SMQ, subsequently it is added here.
+ * The mapping is removed when the schema information is backfilled in the
DB.
+ */
+ private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
inTransitSMQResults = new ConcurrentHashMap<>();
+
+ private final ServiceEmitter emitter;
+
+ @Inject
+ public SegmentSchemaCache(ServiceEmitter emitter)
+ {
+ this.emitter = emitter;
+ }
+
+ /**
+ * Once the schema information is backfilled in the DB, it is added here.
+ * This map is cleared after each DB poll.
+ * After the DB poll and before clearing this map it is possible that some
results were added to this map.
+ * These results would get lost after clearing this map.
+ * But, it should be fine since the schema could be retrieved if needed
using SMQ, also the schema would be available in the next poll.
+ */
+ private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
inTransitSMQPublishedResults = new ConcurrentHashMap<>();
+
+ public void setInitialized()
+ {
+ log.info("[%s] initializing.", getClass().getSimpleName());
+ if (initialized.get().getCount() == 1) {
+ initialized.get().countDown();
+ log.info("[%s] is initialized.", getClass().getSimpleName());
+ }
+ }
+
+ /**
+ * Uninitialize is called when the current node is no longer the leader.
+ * The schema is cleared except for {@code realtimeSegmentSchemaMap}.
+ * Schema map continues to be updated on both the leader and follower nodes.
+ */
+ public void uninitialize()
+ {
+ log.info("[%s] is uninitializing.", getClass().getSimpleName());
+ initialized.set(new CountDownLatch(1));
+
+ finalizedSegmentSchema.clear();
+ finalizedSegmentStats.clear();
+ inTransitSMQResults.clear();
+ inTransitSMQPublishedResults.clear();
+ }
+
+ /**
+ * {@link CoordinatorSegmentMetadataCache} startup waits on the cache
initialization.
+ * This is being done to ensure that we don't execute SMQ for segment with
schema already present in the DB.
+ */
+ public void awaitInitialization() throws InterruptedException
+ {
+ initialized.get().await();
+ }
+
+ public void updateFinalizedSegmentStatsReference(ConcurrentMap<SegmentId,
SegmentStats> segmentStatsMap)
+ {
+ this.finalizedSegmentStats = segmentStatsMap;
+ }
+
+ public void addFinalizedSegmentSchema(long schemaId, SchemaPayload
schemaPayload)
Review Comment:
All of these methods should have javadocs mentioning see class level
javadocs for more information.
##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -177,20 +385,70 @@ public void refresh(final Set<SegmentId>
segmentsToRefresh, final Set<String> da
private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds)
{
- if (realtimeSegmentSchemaAnnouncement) {
- synchronized (lock) {
- segmentIds.removeAll(mutableSegments);
+ Set<SegmentId> preFilter = new HashSet<>(segmentIds);
Review Comment:
We need not create new hashSets every time. Just remove from original.
##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.SegmentId;
+
+import javax.annotation.Nullable;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * In-memory cache of segment schema.
+ * <p>
+ * Internally, mapping of segmentId to segment level information like schemaId
& numRows is maintained.
+ * This mapping is updated on each database poll {@code finalizedSegmentStats}.
+ * Segment schema created since last DB poll is also fetched and updated in
the cache {@code finalizedSegmentSchema}.
+ * <p>
+ * Additionally, this class caches schema for realtime segments in {@code
realtimeSegmentSchemaMap}. This mapping
+ * is cleared either when the segment is removed or marked as finalized.
+ * <p>
+ * Finalized segments which do not have their schema information present in
the DB, fetch their schema via SMQ.
+ * SMQ results are cached in {@code inTransitSMQResults}. Once the schema
information is backfilled
+ * in the DB, it is removed from {@code inTransitSMQResults} and added to
{@code inTransitSMQPublishedResults}.
+ * {@code inTransitSMQPublishedResults} is cleared on each successfull DB poll.
+ */
+@LazySingleton
+public class SegmentSchemaCache
+{
+ private static final EmittingLogger log = new
EmittingLogger(SegmentSchemaCache.class);
+
+ // Cache is marked initialized after first DB poll.
+ private final AtomicReference<CountDownLatch> initialized = new
AtomicReference<>(new CountDownLatch(1));
+
+ /**
+ * Mapping from segmentId to segment level information which includes
numRows and schemaId.
+ * This mapping is updated on each database poll.
+ */
+ private volatile ConcurrentMap<SegmentId, SegmentStats>
finalizedSegmentStats = new ConcurrentHashMap<>();
+
+ /**
+ * Mapping from schemaId to payload. Gets updated after DB poll.
+ */
+ private volatile ConcurrentMap<Long, SchemaPayload> finalizedSegmentSchema =
new ConcurrentHashMap<>();
+
+ /**
+ * Schema information for realtime segment. This mapping is updated when
schema for realtime segment is received.
+ * The mapping is removed when the segment is either removed or marked as
finalized.
+ */
+ private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
realtimeSegmentSchemaMap = new ConcurrentHashMap<>();
+
+ /**
+ * If the segment schema is fetched via SMQ, subsequently it is added here.
+ * The mapping is removed when the schema information is backfilled in the
DB.
+ */
+ private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
inTransitSMQResults = new ConcurrentHashMap<>();
+
+ private final ServiceEmitter emitter;
+
+ @Inject
+ public SegmentSchemaCache(ServiceEmitter emitter)
+ {
+ this.emitter = emitter;
+ }
+
+ /**
+ * Once the schema information is backfilled in the DB, it is added here.
+ * This map is cleared after each DB poll.
+ * After the DB poll and before clearing this map it is possible that some
results were added to this map.
+ * These results would get lost after clearing this map.
+ * But, it should be fine since the schema could be retrieved if needed
using SMQ, also the schema would be available in the next poll.
+ */
+ private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
inTransitSMQPublishedResults = new ConcurrentHashMap<>();
+
+ public void setInitialized()
+ {
+ log.info("[%s] initializing.", getClass().getSimpleName());
+ if (initialized.get().getCount() == 1) {
+ initialized.get().countDown();
+ log.info("[%s] is initialized.", getClass().getSimpleName());
+ }
+ }
+
+ /**
+ * Uninitialize is called when the current node is no longer the leader.
+ * The schema is cleared except for {@code realtimeSegmentSchemaMap}.
+ * Schema map continues to be updated on both the leader and follower nodes.
+ */
+ public void uninitialize()
+ {
+ log.info("[%s] is uninitializing.", getClass().getSimpleName());
+ initialized.set(new CountDownLatch(1));
+
+ finalizedSegmentSchema.clear();
+ finalizedSegmentStats.clear();
+ inTransitSMQResults.clear();
+ inTransitSMQPublishedResults.clear();
+ }
+
+ /**
+ * {@link CoordinatorSegmentMetadataCache} startup waits on the cache
initialization.
+ * This is being done to ensure that we don't execute SMQ for segment with
schema already present in the DB.
+ */
+ public void awaitInitialization() throws InterruptedException
+ {
+ initialized.get().await();
+ }
+
+ public void updateFinalizedSegmentStatsReference(ConcurrentMap<SegmentId,
SegmentStats> segmentStatsMap)
+ {
+ this.finalizedSegmentStats = segmentStatsMap;
+ }
+
+ public void addFinalizedSegmentSchema(long schemaId, SchemaPayload
schemaPayload)
+ {
+ finalizedSegmentSchema.put(schemaId, schemaPayload);
+ }
+
+ public void updateFinalizedSegmentSchemaReference(ConcurrentMap<Long,
SchemaPayload> schemaPayloadMap)
+ {
+ finalizedSegmentSchema = schemaPayloadMap;
+ }
+
+ public void addRealtimeSegmentSchema(SegmentId segmentId, RowSignature
rowSignature, long numRows)
+ {
+ realtimeSegmentSchemaMap.put(segmentId, new SchemaPayloadPlus(new
SchemaPayload(rowSignature), numRows));
+ }
+
+ public void addInTransitSMQResult(SegmentId segmentId, RowSignature
rowSignature, long numRows)
+ {
+ inTransitSMQResults.put(segmentId, new SchemaPayloadPlus(new
SchemaPayload(rowSignature), numRows));
+ }
+
+ /**
+ * When the SMQ result is published to the DB, it is removed from the {@code
inTransitSMQResults}
+ * and added to {@code inTransitSMQPublishedResults}.
+ */
+ public void markInTransitSMQResultPublished(SegmentId segmentId)
+ {
+ if (!inTransitSMQResults.containsKey(segmentId)) {
+ log.error("SegmentId [%s] not found in InTransitSMQResultPublished
map.", segmentId);
+ return;
+ }
+
+ inTransitSMQPublishedResults.put(segmentId,
inTransitSMQResults.get(segmentId));
+ inTransitSMQResults.remove(segmentId);
+ }
+
+ /**
+ * {@code inTransitSMQPublishedResults} is reset on each DB poll.
+ */
+ public void resetInTransitSMQResultPublishedOnDBPoll()
+ {
+ inTransitSMQPublishedResults.clear();
+ }
+
+ public Optional<SchemaPayloadPlus> getSchemaForSegment(SegmentId segmentId)
+ {
+ // We first look up the schema in the realtime map. This ensures that
during handoff
+ // there is no window where segment schema is missing from the cache.
+ // If were to look up the finalized segment map first, during handoff it
is possible
+ // that segment schema isn't polled yet and thus missing from the map and
by the time
+ // we look up the schema in the realtime map, it has been removed.
+ if (realtimeSegmentSchemaMap.containsKey(segmentId)) {
+ return Optional.of(realtimeSegmentSchemaMap.get(segmentId));
+ }
+
+ // it is important to lookup {@code inTransitSMQResults} before {@code
inTransitSMQPublishedResults}
+ // in the other way round, if a segment schema is just published it is
possible that the schema is missing
+ // in {@code inTransitSMQPublishedResults} and by the time we check {@code
inTransitSMQResults} it is removed.
+
+ // segment schema has been fetched via SMQ
+ if (inTransitSMQResults.containsKey(segmentId)) {
+ return Optional.of(inTransitSMQResults.get(segmentId));
+ }
+
+ // segment schema has been fetched via SMQ and the schema has been
published to the DB
+ if (inTransitSMQPublishedResults.containsKey(segmentId)) {
+ return Optional.of(inTransitSMQPublishedResults.get(segmentId));
+ }
+
+ // segment schema has been polled from the DB
+ if (finalizedSegmentStats.containsKey(segmentId)) {
+ SegmentStats segmentStats = finalizedSegmentStats.get(segmentId);
+ Long schemaId = segmentStats.getSchemaId();
+ if (schemaId == null || segmentStats.getNumRows() == null) {
+ log.error(
+ "Missing schemaId or numRows for segmentId [%s]. SchemaId present
[%s], numRows present [%s]",
+ segmentId, schemaId != null, segmentStats.getNumRows() != null
+ );
+ }
+
+ if (schemaId != null && finalizedSegmentSchema.containsKey(schemaId)) {
+ return Optional.of(
+ new SchemaPayloadPlus(
+ finalizedSegmentSchema.get(schemaId),
+ segmentStats.getNumRows() == null ? 0 :
segmentStats.getNumRows()
+ )
+ );
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ /**
+ * Check if the schema is cached.
Review Comment:
Also add a note here since we only poll schema of current versions, we donot
need a check on version here.
Version change would always result in a coordinator restart so we would
never cached old versions.
##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -202,51 +460,58 @@ void updateSchemaForSegments(SegmentSchemas
segmentSchemas)
continue;
}
- log.debug("Applying schema update for segmentId [%s] datasource [%s]",
segmentId, dataSource);
+ log.info("Applying schema update for segmentId [%s] datasource [%s]",
segmentId, dataSource);
segmentMetadataInfo.compute(
dataSource,
(dataSourceKey, segmentsMap) -> {
if (segmentsMap == null) {
- segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER);
- }
- segmentsMap.compute(
- segmentId,
- (id, segmentMetadata) -> {
- if (segmentMetadata == null) {
- // By design, this case shouldn't arise since both segment
and schema is announced in the same flow
- // and messages shouldn't be lost in the poll
- // also segment announcement should always precede schema
announcement
- // and there shouldn't be any schema updated for removed
segments
- log.makeAlert("Schema update [%s] for unknown segment
[%s]", segmentSchema, segmentId).emit();
- } else {
- // We know this segment.
- Optional<RowSignature> rowSignature =
- mergeOrCreateRowSignature(
+ // Datasource may have been removed or become unavailable while
this refresh was ongoing.
+ log.info(
+ "No segment map found with datasource [%s], skipping refresh
of segment [%s]",
+ dataSourceKey,
+ segmentId
+ );
+ return null;
Review Comment:
Earlier we were returning a skipList but now we are returning null. I donot
understand why.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java:
##########
@@ -702,37 +716,40 @@ public ListenableFuture<SegmentsAndCommitMetadata> push(
}
// push it:
- final DataSegment dataSegment = mergeAndPush(
+ final DataSegmentWithSchema dataSegmentWithSchema = mergeAndPush(
identifier,
sinkForIdentifier
);
// record it:
- if (dataSegment != null) {
- dataSegments.add(dataSegment);
+ if (dataSegmentWithSchema.getDataSegment() != null) {
+ DataSegment segment = dataSegmentWithSchema.getDataSegment();
+ dataSegments.add(segment);
+ SchemaPayloadPlus schemaPayloadPlus =
dataSegmentWithSchema.getSegmentSchemaMetadata();
+ if (schemaPayloadPlus != null) {
+ SchemaPayload schemaPayload =
schemaPayloadPlus.getSchemaPayload();
+ minimalSegmentSchemas.addSchema(
+ segment.getId().toString(),
+ fingerprintGenerator.generateFingerprint(schemaPayload),
+ schemaPayloadPlus.getNumRows(),
+ schemaPayload
+ );
+ }
} else {
log.warn("mergeAndPush[%s] returned null, skipping.",
identifier);
}
}
log.info("Push done: total sinks merged[%d], total hydrants
merged[%d]",
identifiers.size(), totalHydrantsMerged
);
- return new SegmentsAndCommitMetadata(dataSegments, commitMetadata);
+ return new SegmentsAndCommitMetadata(dataSegments, commitMetadata,
minimalSegmentSchemas);
},
pushExecutor // push it in the background, pushAndClear in
BaseAppenderatorDriver guarantees
// that segments are dropped before next add row
);
}
- /**
- * Merge segment, push to deep storage. Should only be used on segments that
have been fully persisted.
- *
- * @param identifier sink identifier
- * @param sink sink to push
- * @return segment descriptor, or null if the sink is no longer valid
- */
- @Nullable
- private DataSegment mergeAndPush(
+ private DataSegmentWithSchema mergeAndPush(
Review Comment:
Javadoc seem to be accidentally removed.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java:
##########
@@ -304,7 +338,10 @@ private Set<DataSegment> mergeAndPushSegments(
);
}
}
- return pushedSegments;
+ if (centralizedDatasourceSchemaConfig.isEnabled()) {
+ LOG.info("SegmentSchema for the pushed segments is [%s]",
minimalSegmentSchemas);
+ }
+ return new DataSegmentWithSchemas(pushedSegments, minimalSegmentSchemas);
Review Comment:
Lets change this code to: If MinimalSchema.isEmpty()-> send null else
minimalSchema in every index task.
##########
processing/src/main/java/org/apache/druid/segment/DataSegmentWithSchemas.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Encapsulates segment metadata and corresponding schema.
+ */
+public class DataSegmentWithSchemas
+{
+ private final Set<DataSegment> segments;
+ private final MinimalSegmentSchemas minimalSegmentSchemas;
+
+ public DataSegmentWithSchemas(String schemaVersion)
+ {
+ this.segments = new HashSet<>();
+ this.minimalSegmentSchemas = new MinimalSegmentSchemas(schemaVersion);
+ }
+
+ @JsonCreator
+ public DataSegmentWithSchemas(
+ @JsonProperty("segments") Set<DataSegment> segments,
+ @JsonProperty("minimalSegmentSchemas") MinimalSegmentSchemas
minimalSegmentSchemas
+ )
+ {
+ this.segments = segments;
+ this.minimalSegmentSchemas = minimalSegmentSchemas;
+ }
+
+ @JsonProperty
+ public Set<DataSegment> getSegments()
+ {
+ return segments;
+ }
+
+ @JsonProperty
+ public MinimalSegmentSchemas getMinimalSegmentSchemas()
+ {
+ return minimalSegmentSchemas;
Review Comment:
We can return an immutable object in a future PR here.
##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.column.RowSignature;
+import
org.apache.druid.segment.metadata.SegmentSchemaManager.SegmentSchemaMetadataPlus;
+import org.apache.druid.timeline.SegmentId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class publishes the segment schema for segments obtained via segment
metadata query.
+ * It maintains a queue which is populated by {@link
CoordinatorSegmentMetadataCache}.
+ */
+@ManageLifecycle
+public class SegmentSchemaBackFillQueue
+{
+ private static final EmittingLogger log = new
EmittingLogger(SegmentSchemaBackFillQueue.class);
+ private static final int MAX_BATCH_SIZE = 500;
+ private final BlockingDeque<SegmentSchemaMetadataPlus> queue = new
LinkedBlockingDeque<>();
+ private final long executionPeriod;
+
+ private final SegmentSchemaManager segmentSchemaManager;
+ private final SegmentSchemaCache segmentSchemaCache;
+ private final ServiceEmitter emitter;
+ private final CentralizedDatasourceSchemaConfig config;
+ private ScheduledExecutorService executor;
+ private @Nullable ScheduledFuture<?> scheduledFuture = null;
+
+ @Inject
+ public SegmentSchemaBackFillQueue(
+ SegmentSchemaManager segmentSchemaManager,
+ ScheduledExecutorFactory scheduledExecutorFactory,
+ SegmentSchemaCache segmentSchemaCache,
+ ServiceEmitter emitter,
+ CentralizedDatasourceSchemaConfig config
+ )
+ {
+ this.segmentSchemaManager = segmentSchemaManager;
+ this.segmentSchemaCache = segmentSchemaCache;
+ this.emitter = emitter;
+ this.config = config;
+ this.executionPeriod = config.getBackFillPeriod();
+ if (isEnabled()) {
+ this.executor = scheduledExecutorFactory.create(1,
"SegmentSchemaBackFillQueue-%s");
+ }
+ }
+
+ @LifecycleStop
+ public void stop()
+ {
+ this.executor.shutdownNow();
+ scheduledFuture = null;
+ }
+
+ public void leaderStart()
+ {
+ if (isEnabled()) {
+ scheduledFuture = executor.scheduleAtFixedRate(this::processBatchesDue,
executionPeriod, executionPeriod, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void leaderStop()
+ {
+ if (isEnabled()) {
+ scheduledFuture.cancel(true);
Review Comment:
should we also set the future to null here ?
##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.SegmentId;
+
+import javax.annotation.Nullable;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * In-memory cache of segment schema.
+ * <p>
+ * Internally, mapping of segmentId to segment level information like schemaId
& numRows is maintained.
+ * This mapping is updated on each database poll {@code finalizedSegmentStats}.
+ * Segment schema created since last DB poll is also fetched and updated in
the cache {@code finalizedSegmentSchema}.
+ * <p>
+ * Additionally, this class caches schema for realtime segments in {@code
realtimeSegmentSchemaMap}. This mapping
+ * is cleared either when the segment is removed or marked as finalized.
+ * <p>
+ * Finalized segments which do not have their schema information present in
the DB, fetch their schema via SMQ.
+ * SMQ results are cached in {@code inTransitSMQResults}. Once the schema
information is backfilled
+ * in the DB, it is removed from {@code inTransitSMQResults} and added to
{@code inTransitSMQPublishedResults}.
+ * {@code inTransitSMQPublishedResults} is cleared on each successfull DB poll.
+ */
+@LazySingleton
+public class SegmentSchemaCache
+{
+ private static final EmittingLogger log = new
EmittingLogger(SegmentSchemaCache.class);
+
+ // Cache is marked initialized after first DB poll.
+ private final AtomicReference<CountDownLatch> initialized = new
AtomicReference<>(new CountDownLatch(1));
+
+ /**
+ * Mapping from segmentId to segment level information which includes
numRows and schemaId.
+ * This mapping is updated on each database poll.
+ */
+ private volatile ConcurrentMap<SegmentId, SegmentStats>
finalizedSegmentStats = new ConcurrentHashMap<>();
+
+ /**
+ * Mapping from schemaId to payload. Gets updated after DB poll.
+ */
+ private volatile ConcurrentMap<Long, SchemaPayload> finalizedSegmentSchema =
new ConcurrentHashMap<>();
+
+ /**
+ * Schema information for realtime segment. This mapping is updated when
schema for realtime segment is received.
+ * The mapping is removed when the segment is either removed or marked as
finalized.
+ */
+ private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
realtimeSegmentSchemaMap = new ConcurrentHashMap<>();
+
+ /**
+ * If the segment schema is fetched via SMQ, subsequently it is added here.
+ * The mapping is removed when the schema information is backfilled in the
DB.
+ */
+ private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
inTransitSMQResults = new ConcurrentHashMap<>();
+
+ private final ServiceEmitter emitter;
+
+ @Inject
+ public SegmentSchemaCache(ServiceEmitter emitter)
+ {
+ this.emitter = emitter;
+ }
+
+ /**
+ * Once the schema information is backfilled in the DB, it is added here.
+ * This map is cleared after each DB poll.
+ * After the DB poll and before clearing this map it is possible that some
results were added to this map.
+ * These results would get lost after clearing this map.
+ * But, it should be fine since the schema could be retrieved if needed
using SMQ, also the schema would be available in the next poll.
+ */
+ private final ConcurrentMap<SegmentId, SchemaPayloadPlus>
inTransitSMQPublishedResults = new ConcurrentHashMap<>();
+
+ public void setInitialized()
+ {
+ log.info("[%s] initializing.", getClass().getSimpleName());
+ if (initialized.get().getCount() == 1) {
+ initialized.get().countDown();
+ log.info("[%s] is initialized.", getClass().getSimpleName());
+ }
+ }
+
+ /**
+ * Uninitialize is called when the current node is no longer the leader.
+ * The schema is cleared except for {@code realtimeSegmentSchemaMap}.
+ * Schema map continues to be updated on both the leader and follower nodes.
+ */
+ public void uninitialize()
+ {
+ log.info("[%s] is uninitializing.", getClass().getSimpleName());
+ initialized.set(new CountDownLatch(1));
+
+ finalizedSegmentSchema.clear();
+ finalizedSegmentStats.clear();
+ inTransitSMQResults.clear();
+ inTransitSMQPublishedResults.clear();
+ }
+
+ /**
+ * {@link CoordinatorSegmentMetadataCache} startup waits on the cache
initialization.
+ * This is being done to ensure that we don't execute SMQ for segment with
schema already present in the DB.
+ */
+ public void awaitInitialization() throws InterruptedException
+ {
+ initialized.get().await();
+ }
+
+ public void updateFinalizedSegmentStatsReference(ConcurrentMap<SegmentId,
SegmentStats> segmentStatsMap)
+ {
+ this.finalizedSegmentStats = segmentStatsMap;
+ }
+
+ public void addFinalizedSegmentSchema(long schemaId, SchemaPayload
schemaPayload)
+ {
+ finalizedSegmentSchema.put(schemaId, schemaPayload);
+ }
+
+ public void updateFinalizedSegmentSchemaReference(ConcurrentMap<Long,
SchemaPayload> schemaPayloadMap)
+ {
+ finalizedSegmentSchema = schemaPayloadMap;
+ }
+
+ public void addRealtimeSegmentSchema(SegmentId segmentId, RowSignature
rowSignature, long numRows)
+ {
+ realtimeSegmentSchemaMap.put(segmentId, new SchemaPayloadPlus(new
SchemaPayload(rowSignature), numRows));
+ }
+
+ public void addInTransitSMQResult(SegmentId segmentId, RowSignature
rowSignature, long numRows)
+ {
+ inTransitSMQResults.put(segmentId, new SchemaPayloadPlus(new
SchemaPayload(rowSignature), numRows));
+ }
+
+ /**
+ * When the SMQ result is published to the DB, it is removed from the {@code
inTransitSMQResults}
+ * and added to {@code inTransitSMQPublishedResults}.
+ */
+ public void markInTransitSMQResultPublished(SegmentId segmentId)
+ {
+ if (!inTransitSMQResults.containsKey(segmentId)) {
+ log.error("SegmentId [%s] not found in InTransitSMQResultPublished
map.", segmentId);
+ return;
Review Comment:
This should not return. We should continue the operation anyway.
##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.column.RowSignature;
+import
org.apache.druid.segment.metadata.SegmentSchemaManager.SegmentSchemaMetadataPlus;
+import org.apache.druid.timeline.SegmentId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class publishes the segment schema for segments obtained via segment
metadata query.
+ * It maintains a queue which is populated by {@link
CoordinatorSegmentMetadataCache}.
+ */
+@ManageLifecycle
+public class SegmentSchemaBackFillQueue
+{
+ private static final EmittingLogger log = new
EmittingLogger(SegmentSchemaBackFillQueue.class);
+ private static final int MAX_BATCH_SIZE = 500;
+ private final BlockingDeque<SegmentSchemaMetadataPlus> queue = new
LinkedBlockingDeque<>();
+ private final long executionPeriod;
+
+ private final SegmentSchemaManager segmentSchemaManager;
+ private final SegmentSchemaCache segmentSchemaCache;
+ private final ServiceEmitter emitter;
+ private final CentralizedDatasourceSchemaConfig config;
+ private ScheduledExecutorService executor;
+ private @Nullable ScheduledFuture<?> scheduledFuture = null;
+
+ @Inject
+ public SegmentSchemaBackFillQueue(
+ SegmentSchemaManager segmentSchemaManager,
+ ScheduledExecutorFactory scheduledExecutorFactory,
+ SegmentSchemaCache segmentSchemaCache,
+ ServiceEmitter emitter,
+ CentralizedDatasourceSchemaConfig config
+ )
+ {
+ this.segmentSchemaManager = segmentSchemaManager;
+ this.segmentSchemaCache = segmentSchemaCache;
+ this.emitter = emitter;
+ this.config = config;
+ this.executionPeriod = config.getBackFillPeriod();
+ if (isEnabled()) {
+ this.executor = scheduledExecutorFactory.create(1,
"SegmentSchemaBackFillQueue-%s");
+ }
+ }
+
+ @LifecycleStop
+ public void stop()
+ {
+ this.executor.shutdownNow();
+ scheduledFuture = null;
+ }
+
+ public void leaderStart()
+ {
+ if (isEnabled()) {
+ scheduledFuture = executor.scheduleAtFixedRate(this::processBatchesDue,
executionPeriod, executionPeriod, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void leaderStop()
+ {
+ if (isEnabled()) {
+ scheduledFuture.cancel(true);
+ }
+ }
+
+ public void add(
+ SegmentId segmentId,
+ RowSignature rowSignature,
+ long numRows,
+ Map<String, AggregatorFactory> aggregators
+ )
+ {
+ SchemaPayload schemaPayload = new SchemaPayload(rowSignature, aggregators);
+ SchemaPayloadPlus schemaMetadata = new SchemaPayloadPlus(schemaPayload,
numRows);
+ queue.add(
+ new SegmentSchemaMetadataPlus(
+ segmentId,
+
segmentSchemaManager.generateSchemaPayloadFingerprint(schemaMetadata.getSchemaPayload()),
+ schemaMetadata
+ )
+ );
+ }
+
+ public boolean isEnabled()
+ {
+ return config.isEnabled() && config.isBackFillEnabled();
+ }
+
+ public void processBatchesDue()
+ {
+ if (queue.isEmpty()) {
Review Comment:
Lets rap this method in a try catch which will ensure the thread never dies.
##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1088,6 +1123,174 @@ public DataSegment map(int index, ResultSet r,
StatementContext ctx) throws SQLE
);
}
+ private void doPollSegmentAndSchema()
+ {
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ log.info("Starting polling of segment and schema table. latestSchemaId is
[%s].", latestSchemaId.get());
+
+ ConcurrentMap<SegmentId, SegmentSchemaCache.SegmentStats> segmentStats =
new ConcurrentHashMap<>();
+
+ // some databases such as PostgreSQL require auto-commit turned off
+ // to stream results back, enabling transactions disables auto-commit
+ //
+ // setting connection to read-only will allow some database such as MySQL
+ // to automatically use read-only transaction mode, further optimizing the
query
+ final List<DataSegment> segments = connector.inReadOnlyTransaction(
+ new TransactionCallback<List<DataSegment>>()
+ {
+ @Override
+ public List<DataSegment> inTransaction(Handle handle,
TransactionStatus status)
+ {
+ return handle
+ .createQuery(StringUtils.format("SELECT payload, schema_id,
num_rows FROM %s WHERE used=true", getSegmentsTable()))
+ .setFetchSize(connector.getStreamingFetchSize())
+ .map(
+ new ResultSetMapper<DataSegment>()
+ {
+ @Override
+ public DataSegment map(int index, ResultSet r,
StatementContext ctx) throws SQLException
+ {
+ try {
+ DataSegment segment =
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+ segmentStats.put(
+ segment.getId(),
+ new SegmentSchemaCache.SegmentStats(
+ (Long) r.getObject("schema_id"),
+ (Long) r.getObject("num_rows")
+ )
+ );
+ return replaceWithExistingSegmentIfPresent(segment);
+ }
+ catch (IOException e) {
+ log.makeAlert(e, "Failed to read segment from
db.").emit();
+ // If one entry in database is corrupted doPoll()
should continue to work overall. See
+ // filter by `Objects::nonNull` below in this method.
+ return null;
+ }
+ }
+ }
+ )
+ .list();
+ }
+ }
+ );
+
+ ConcurrentMap<Long, SchemaPayload> schemaMap = new ConcurrentHashMap<>();
+
+ String schemaPollQuery;
+
+ Long lastSchemaIdPrePoll = latestSchemaId.get();
+ if (lastSchemaIdPrePoll == null) {
+ log.info("Executing full schema refresh.");
+ schemaPollQuery =
+ StringUtils.format(
+ "SELECT id, payload FROM %s WHERE version = '%s'",
+ getSegmentSchemaTable(),
+ CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
+ );
+ } else {
+ schemaPollQuery = StringUtils.format(
+ "SELECT id, payload FROM %s WHERE version = '%s' AND id > %s",
+ getSegmentSchemaTable(),
+ CentralizedDatasourceSchemaConfig.SCHEMA_VERSION,
+ lastSchemaIdPrePoll
+ );
+ }
+ String finalSchemaPollQuery = schemaPollQuery;
+
+ final AtomicReference<Long> maxPolledId = new AtomicReference<>();
+ maxPolledId.set(lastSchemaIdPrePoll);
+
+ connector.inReadOnlyTransaction(new TransactionCallback<Object>()
+ {
+ @Override
+ public Object inTransaction(Handle handle, TransactionStatus status)
+ {
+ return handle.createQuery(finalSchemaPollQuery)
+ .setFetchSize(connector.getStreamingFetchSize())
+ .map(new ResultSetMapper<Void>()
+ {
+ @Override
+ public Void map(int index, ResultSet r,
StatementContext ctx) throws SQLException
+ {
+ try {
+ long id = r.getLong("id");
+
+ if (maxPolledId.get() == null || id >
maxPolledId.get()) {
+ maxPolledId.set(id);
+ }
+
+ schemaMap.put(
+ r.getLong("id"),
+ jsonMapper.readValue(r.getBytes("payload"),
SchemaPayload.class)
+ );
+ }
+ catch (IOException e) {
+ log.makeAlert(e, "Failed to read schema from
db.").emit();
+ }
+ return null;
+ }
+ }).list();
+ }
+ });
+
+ log.debug("SchemaMap polled from the database is [%s]", schemaMap);
+
+ if (lastSchemaIdPrePoll == null) {
+ // full refresh
+ segmentSchemaCache.updateFinalizedSegmentSchemaReference(schemaMap);
+ } else {
+ // delta update
+ schemaMap.forEach(segmentSchemaCache::addFinalizedSegmentSchema);
+ }
+ segmentSchemaCache.updateFinalizedSegmentStatsReference(segmentStats);
+ segmentSchemaCache.resetInTransitSMQResultPublishedOnDBPoll();
+
+ if (lastSchemaIdPrePoll == null) {
+ segmentSchemaCache.setInitialized();
+ }
+
+ // It is possible that the schema cleanup duty resets the {@code
latestSchemaId}.
+ // In that case, we shouldn't update this value with the latest polled
schemaId.
+ latestSchemaId.compareAndSet(lastSchemaIdPrePoll, maxPolledId.get());
+
+ segmentSchemaCache.emitStats();
Review Comment:
We are calling reset and then emitting stats.
Lets get the numbers in the transaction block and use that to emit stats.
So we would need the segmentSchema to expose a method for giving stat
object.
##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.column.RowSignature;
+import
org.apache.druid.segment.metadata.SegmentSchemaManager.SegmentSchemaMetadataPlus;
+import org.apache.druid.timeline.SegmentId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class publishes the segment schema for segments obtained via segment
metadata query.
+ * It maintains a queue which is populated by {@link
CoordinatorSegmentMetadataCache}.
+ */
+@ManageLifecycle
+public class SegmentSchemaBackFillQueue
+{
+ private static final EmittingLogger log = new
EmittingLogger(SegmentSchemaBackFillQueue.class);
+ private static final int MAX_BATCH_SIZE = 500;
+ private final BlockingDeque<SegmentSchemaMetadataPlus> queue = new
LinkedBlockingDeque<>();
Review Comment:
Since we have the cacheExecThreadPushing data into this and the current
scheduledExecService reading from this, can you please put this in a comment
here so that future readers can know the flow without digging in?
##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -142,20 +333,36 @@ public ServerView.CallbackAction
segmentSchemasAnnounced(SegmentSchemas segmentS
@Override
public void refresh(final Set<SegmentId> segmentsToRefresh, final
Set<String> dataSourcesToRebuild) throws IOException
{
+ log.debug("Segments to refresh [%s], dataSourcesToRebuild [%s]",
segmentsToRefresh, dataSourcesToRebuild);
+ final Set<SegmentId> segmentsToRefreshMinusRealtimeSegments =
filterMutableSegments(segmentsToRefresh);
+ log.debug("SegmentsToRefreshMinusRealtimeSegments [%s]",
segmentsToRefreshMinusRealtimeSegments);
+ final Set<SegmentId> segmentsToRefreshMinusCachedSegments =
filterSegmentWithCachedSchema(segmentsToRefreshMinusRealtimeSegments);
Review Comment:
Mutate the object and return the cached list .
##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.SegmentId;
+
+import javax.annotation.Nullable;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * In-memory cache of segment schema.
+ * <p>
+ * Internally, mapping of segmentId to segment level information like schemaId
& numRows is maintained.
+ * This mapping is updated on each database poll {@code finalizedSegmentStats}.
+ * Segment schema created since last DB poll is also fetched and updated in
the cache {@code finalizedSegmentSchema}.
+ * <p>
+ * Additionally, this class caches schema for realtime segments in {@code
realtimeSegmentSchemaMap}. This mapping
+ * is cleared either when the segment is removed or marked as finalized.
+ * <p>
+ * Finalized segments which do not have their schema information present in
the DB, fetch their schema via SMQ.
+ * SMQ results are cached in {@code inTransitSMQResults}. Once the schema
information is backfilled
+ * in the DB, it is removed from {@code inTransitSMQResults} and added to
{@code inTransitSMQPublishedResults}.
+ * {@code inTransitSMQPublishedResults} is cleared on each successfull DB poll.
Review Comment:
Also add the callers of this class and how they use them .
##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.column.RowSignature;
+import
org.apache.druid.segment.metadata.SegmentSchemaManager.SegmentSchemaMetadataPlus;
+import org.apache.druid.timeline.SegmentId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class publishes the segment schema for segments obtained via segment
metadata query.
+ * It maintains a queue which is populated by {@link
CoordinatorSegmentMetadataCache}.
+ */
+@ManageLifecycle
+public class SegmentSchemaBackFillQueue
+{
+ private static final EmittingLogger log = new
EmittingLogger(SegmentSchemaBackFillQueue.class);
+ private static final int MAX_BATCH_SIZE = 500;
+ private final BlockingDeque<SegmentSchemaMetadataPlus> queue = new
LinkedBlockingDeque<>();
+ private final long executionPeriod;
+
+ private final SegmentSchemaManager segmentSchemaManager;
+ private final SegmentSchemaCache segmentSchemaCache;
+ private final ServiceEmitter emitter;
+ private final CentralizedDatasourceSchemaConfig config;
+ private ScheduledExecutorService executor;
Review Comment:
This should be final.
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskSegmentSchemaUtil.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TaskSegmentSchemaUtil
+{
+ public static SchemaPayloadPlus getSegmentSchema(File segmentFile, IndexIO
indexIO) throws IOException
Review Comment:
Do we also wanna put a stopwatch to have some notion of time it takes to do
this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]