github-advanced-security[bot] commented on code in PR #15817: URL: https://github.com/apache/druid/pull/15817#discussion_r1534433664
########## server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.SchemaPayload; +import org.apache.druid.segment.column.SegmentSchemaMetadata; +import org.apache.druid.timeline.SegmentId; + +import javax.annotation.Nullable; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; + +/** + * In-memory cache of segment schema. + * <p> + * Internally, mapping of segmentId to segment level information like schemaId & numRows is maintained. + * This mapping is updated on each database poll {@code finalizedSegmentStats}. + * Segment schema created since last DB poll is also fetched and updated in the cache {@code finalizedSegmentSchema}. + * <p> + * Additionally, this class caches schema for realtime segments in {@code realtimeSegmentSchemaMap}. This mapping + * is cleared either when the segment is removed or marked as finalized. + * <p> + * Finalized segments which do not have their schema information present in the DB, fetch their schema via SMQ. + * SMQ results are cached in {@code inTransitSMQResults}. Once the schema information is backfilled + * in the DB, it is removed from {@code inTransitSMQResults} and added to {@code inTransitSMQPublishedResults}. + * {@code inTransitSMQPublishedResults} is cleared on each successfull DB poll. + */ +@LazySingleton +public class SegmentSchemaCache +{ + private static final EmittingLogger log = new EmittingLogger(SegmentSchemaCache.class); + + // Cache is marked initialized after first DB poll. + private CountDownLatch initialized = new CountDownLatch(1); + + /** + * Mapping from segmentId to segment level information which includes numRows and schemaId. + * This mapping is updated on each database poll. + */ + private volatile ConcurrentMap<SegmentId, SegmentStats> finalizedSegmentStats = new ConcurrentHashMap<>(); + + /** + * Mapping from schemaId to payload. Gets updated after DB poll. + */ + private volatile ConcurrentMap<Long, SchemaPayload> finalizedSegmentSchema = new ConcurrentHashMap<>(); + + /** + * Schema information for realtime segment. This mapping is updated when schema for realtime segment is received. + * The mapping is removed when the segment is either removed or marked as finalized. + */ + private final ConcurrentMap<SegmentId, SegmentSchemaMetadata> realtimeSegmentSchemaMap = new ConcurrentHashMap<>(); + + /** + * If the segment schema is fetched via SMQ, subsequently it is added here. + * The mapping is removed when the schema information is backfilled in the DB. + */ + private final ConcurrentMap<SegmentId, SegmentSchemaMetadata> inTransitSMQResults = new ConcurrentHashMap<>(); + + /** + * Once the schema information is backfilled in the DB, it is added here. + * This map is cleared after each DB poll. + */ + private volatile ConcurrentMap<SegmentId, SegmentSchemaMetadata> inTransitSMQPublishedResults = new ConcurrentHashMap<>(); + + public void setInitialized() + { + initialized.countDown(); + log.info("SegmentSchemaCache is initialized."); + } + + public void uninitialize() + { + initialized = new CountDownLatch(1); + log.info("SegmentSchemaCache is uninitialized."); + } + + public void awaitInitialization() throws InterruptedException + { + initialized.await(); + } + + public void updateFinalizedSegmentStatsReference(ConcurrentMap<SegmentId, SegmentStats> segmentStatsMap) Review Comment: ## Exposing internal representation updateFinalizedSegmentStatsReference exposes the internal representation stored in field finalizedSegmentStats. The value may be modified [through the variable segmentStatsMap](1). [Show more details](https://github.com/apache/druid/security/code-scanning/7161) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
