cryptoe commented on code in PR #15475: URL: https://github.com/apache/druid/pull/15475#discussion_r1426459057
########## server/src/main/java/org/apache/druid/server/coordination/DataSegmentAnnouncer.java: ########## @@ -32,4 +34,23 @@ public interface DataSegmentAnnouncer void announceSegments(Iterable<DataSegment> segments) throws IOException; void unannounceSegments(Iterable<DataSegment> segments) throws IOException; + + /** + * Announce schema for all sinks for a given task. + * + * @param taskId taskId + * @param segmentSchemas absolute schema for all sinks, in case the client requests full sync. + * @param segmentSchemasChange schema change for all sinks + */ + void announceSinkSchemaForTask( Review Comment: Why the word sink in the interface definition anounceSegmentSchema() ? You can mention in the JavaDoc that this is only for realtime segments ########## server/src/main/java/org/apache/druid/segment/metadata/CentralizedDatasourceSchemaConfig.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; + +/** + * Config for centralizing datasource schema management in Coordinator. + */ +public class CentralizedDatasourceSchemaConfig +{ + @JsonProperty + private boolean enabled = false; + + // If realtime segment schema should be published in segment announcement flow + @JsonProperty + private boolean announceRealtimeSegmentSchema = false; Review Comment: Can you mention in a comment that this property is temp property and will be removed in future released. ########## server/src/main/java/org/apache/druid/client/CoordinatorServerView.java: ########## @@ -114,6 +115,13 @@ public ServerView.CallbackAction segmentViewInitialized() runTimelineCallbacks(TimelineCallback::timelineInitialized); return ServerView.CallbackAction.CONTINUE; } + + @Override + public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) + { + runTimelineCallbacks(callback -> callback.segmentSchemasAnnounced(segmentSchemas)); Review Comment: Why are listeners of the timeline waiting for an event which does not touch the timeline at all. ########## server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java: ########## @@ -102,6 +119,14 @@ public ServerView.CallbackAction serverSegmentRemoved( removeServerSegment(server, segment); return ServerView.CallbackAction.CONTINUE; } + + @Override + public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) + { + log.debug("SegmentSchemas [%s]", segmentSchemas); Review Comment: Lets block this flow when `realtimeSegmentSchemaAnnouncement` is disabled. ########## server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentSchemas.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.segment.column.ColumnType; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Encapsulates schema information for multiple segments. + * <p> + * Primarily used to announce schema changes for all {@link org.apache.druid.segment.realtime.plumber.Sink} + * created by a task in {@link StreamAppenderator}. + */ +public class SegmentSchemas +{ + private final List<SegmentSchema> segmentSchemaList; + + @JsonCreator + public SegmentSchemas( + @JsonProperty("segmentSchemaList") List<SegmentSchema> segmentSchemaList + ) + { + this.segmentSchemaList = segmentSchemaList; + } + + @JsonProperty + public List<SegmentSchema> getSegmentSchemaList() + { + return segmentSchemaList; + } + + /** + * Encapsulates either the absolute schema or schema change for a segment. + */ + public static class SegmentSchema + { + private final String dataSource; + private final String segmentId; + + // represents whether it is a schema change or absolute schema + private final boolean delta; + + // absolute number of rows in the segment + private final Integer numRows; + + // new columns in the segment + private final List<String> newColumns; + + // updated column types, empty for absolute segment schema + private final List<String> updatedColumns; + + // all column should have non-null types + private final Map<String, ColumnType> columnTypeMap; + + @JsonCreator + public SegmentSchema( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("segmentId") String segmentId, + @JsonProperty("delta") boolean delta, Review Comment: and delta too ? ########## server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentSchemas.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.segment.column.ColumnType; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Encapsulates schema information for multiple segments. + * <p> + * Primarily used to announce schema changes for all {@link org.apache.druid.segment.realtime.plumber.Sink} + * created by a task in {@link StreamAppenderator}. + */ +public class SegmentSchemas +{ + private final List<SegmentSchema> segmentSchemaList; + + @JsonCreator + public SegmentSchemas( + @JsonProperty("segmentSchemaList") List<SegmentSchema> segmentSchemaList + ) + { + this.segmentSchemaList = segmentSchemaList; + } + + @JsonProperty + public List<SegmentSchema> getSegmentSchemaList() + { + return segmentSchemaList; + } + + /** + * Encapsulates either the absolute schema or schema change for a segment. + */ + public static class SegmentSchema + { + private final String dataSource; + private final String segmentId; + + // represents whether it is a schema change or absolute schema + private final boolean delta; + + // absolute number of rows in the segment + private final Integer numRows; + + // new columns in the segment + private final List<String> newColumns; + + // updated column types, empty for absolute segment schema + private final List<String> updatedColumns; + + // all column should have non-null types + private final Map<String, ColumnType> columnTypeMap; + + @JsonCreator + public SegmentSchema( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("segmentId") String segmentId, + @JsonProperty("delta") boolean delta, + @JsonProperty("numRows") Integer numRows, + @JsonProperty("newColumns") List<String> newColumns, + @JsonProperty("updatedColumns") List<String> updatedColumns, Review Comment: Shoudn't this be nullable ? ########## server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java: ########## @@ -148,7 +159,9 @@ public Sink( maxCount = hydrant.getCount(); ReferenceCountingSegment segment = hydrant.getIncrementedSegment(); try { - numRowsExcludingCurrIndex.addAndGet(segment.asQueryableIndex().getNumRows()); + QueryableIndex index = segment.asQueryableIndex(); + mergeIndexDimensions(new QueryableIndexStorageAdapter(index)); + numRowsExcludingCurrIndex.addAndGet(index.getNumRows()); Review Comment: This is only for restoring tasks rite ? ########## server/src/main/java/org/apache/druid/client/CoordinatorServerView.java: ########## @@ -114,6 +115,13 @@ public ServerView.CallbackAction segmentViewInitialized() runTimelineCallbacks(TimelineCallback::timelineInitialized); return ServerView.CallbackAction.CONTINUE; } + + @Override + public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) + { + runTimelineCallbacks(callback -> callback.segmentSchemasAnnounced(segmentSchemas)); Review Comment: Can you also update the description of the PR which tells us how the data flows between each of these components. ########## server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkSchemaUtil.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.realtime.appenderator.SegmentSchemas.SegmentSchema; +import org.apache.druid.timeline.SegmentId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * Utility to compute schema for all sinks in a streaming ingestion task, used in {@link StreamAppenderator}. + */ +class SinkSchemaUtil +{ + /** + * Compute {@link SegmentSchemas} for the sinks. + */ + @VisibleForTesting + static Optional<SegmentSchemas> computeAbsoluteSchema( + Map<SegmentId, Pair<RowSignature, Integer>> sinkSchemaMap + ) + { + List<SegmentSchema> sinkSchemas = new ArrayList<>(); + + for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : sinkSchemaMap.entrySet()) { + SegmentId segmentId = entry.getKey(); + RowSignature sinkSignature = entry.getValue().lhs; + + List<String> newColumns = new ArrayList<>(); + + Map<String, ColumnType> columnMapping = new HashMap<>(); + + // new Sink + for (String column : sinkSignature.getColumnNames()) { + newColumns.add(column); + sinkSignature.getColumnType(column).ifPresent(type -> columnMapping.put(column, type)); + } + + Integer numRows = entry.getValue().rhs; + if (newColumns.size() > 0) { + SegmentSchema segmentSchema = + new SegmentSchema( + segmentId.getDataSource(), + segmentId.toString(), + false, + numRows, + newColumns, + Collections.emptyList(), + columnMapping + ); + sinkSchemas.add(segmentSchema); + } + } + + return Optional.ofNullable(sinkSchemas.isEmpty() ? null : new SegmentSchemas(sinkSchemas)); + } + + /** + * Compute schema change for the sinks. + */ + @VisibleForTesting + static Optional<SegmentSchemas> computeSchemaChange( + Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap, + Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap + ) + { + List<SegmentSchema> sinkSchemas = new ArrayList<>(); + + for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : currentSinkSignatureMap.entrySet()) { + SegmentId segmentId = entry.getKey(); + RowSignature sinkSignature = entry.getValue().lhs; + + Integer numRows = entry.getValue().rhs; + + List<String> newColumns = new ArrayList<>(); + List<String> updatedColumns = new ArrayList<>(); + Map<String, ColumnType> columnMapping = new HashMap<>(); + + // whether there are any changes to be published + boolean publish = false; + // if the resultant schema is delta + boolean delta = false; + + if (!previousSinkSignatureMap.containsKey(segmentId)) { + // new Sink + for (String column : sinkSignature.getColumnNames()) { + newColumns.add(column); + sinkSignature.getColumnType(column).ifPresent(type -> columnMapping.put(column, type)); + } + if (newColumns.size() > 0 || numRows > 0) { + publish = true; + } + } else { + RowSignature previousSinkSignature = previousSinkSignatureMap.get(segmentId).lhs; + Set<String> previousSinkDimensions = new HashSet<>(previousSinkSignature.getColumnNames()); + + Integer previousNumRows = previousSinkSignatureMap.get(segmentId).rhs; + for (String column : sinkSignature.getColumnNames()) { + boolean added = false; + if (!previousSinkDimensions.contains(column)) { + newColumns.add(column); + added = true; + } else if (!Objects.equals(previousSinkSignature.getColumnType(column), sinkSignature.getColumnType(column))) { + updatedColumns.add(column); + added = true; + } + + if (added) { + sinkSignature.getColumnType(column).ifPresent(type -> columnMapping.put(column, type)); + } + } + + if ((!Objects.equals(numRows, previousNumRows)) || (updatedColumns.size() > 0) || (newColumns.size() > 0)) { + publish = true; + delta = true; Review Comment: ```suggestion ``` ########## server/src/main/java/org/apache/druid/client/TimelineServerView.java: ########## @@ -101,5 +102,12 @@ interface TimelineCallback * @return continue or unregister */ CallbackAction serverSegmentRemoved(DruidServerMetadata server, DataSegment segment); + + /** + * Called when segment schema is announced. Review Comment: Lets document the nuances of this flow and why was it done in such a way. ########## server/src/main/java/org/apache/druid/server/coordination/DataSegmentAnnouncer.java: ########## @@ -32,4 +34,23 @@ public interface DataSegmentAnnouncer void announceSegments(Iterable<DataSegment> segments) throws IOException; void unannounceSegments(Iterable<DataSegment> segments) throws IOException; + + /** + * Announce schema for all sinks for a given task. + * + * @param taskId taskId + * @param segmentSchemas absolute schema for all sinks, in case the client requests full sync. + * @param segmentSchemasChange schema change for all sinks + */ + void announceSinkSchemaForTask( + String taskId, + SegmentSchemas segmentSchemas, + @Nullable SegmentSchemas segmentSchemasChange + ); + + /** + * Unnannounce task. + * @param taskId taskId + */ + void unannouceTask(String taskId); Review Comment: This method does not make sense in this interface. Are you looking for a method to invalidate all the schemasForATask? Please add java docs mentioning what's the intended use of this method. ########## server/src/main/java/org/apache/druid/client/BrokerServerView.java: ########## @@ -144,6 +145,12 @@ public CallbackAction segmentViewInitialized() runTimelineCallbacks(TimelineCallback::timelineInitialized); return ServerView.CallbackAction.CONTINUE; } + + @Override + public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) + { + return CallbackAction.CONTINUE; Review Comment: What about `CallBackAction.Unregister` here ? ########## server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java: ########## @@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da } } } + + private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds) + { + if (realtimeSegmentSchemaAnnouncement) { + synchronized (lock) { + for (SegmentId segmentId : mutableSegments) { + segmentIds.remove(segmentId); + } + } + } + return segmentIds; + } + + /** + * Update schema for segments. + */ + @VisibleForTesting + void updateSchemaForSegments(SegmentSchemas segmentSchemas) + { + List<SegmentSchemas.SegmentSchema> segmentSchemaList = segmentSchemas.getSegmentSchemaList(); + + for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) { + String dataSource = segmentSchema.getDataSource(); + SegmentId segmentId = SegmentId.tryParse(dataSource, segmentSchema.getSegmentId()); Review Comment: We would need a null check here. If null lets skip it by info logging, ########## server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java: ########## @@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da } } } + + private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds) + { + if (realtimeSegmentSchemaAnnouncement) { + synchronized (lock) { + for (SegmentId segmentId : mutableSegments) { Review Comment: Lets do `segmentsIds.removeAll()` ########## server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java: ########## @@ -252,7 +253,7 @@ private void startCacheExec() try { while (!Thread.currentThread().isInterrupted()) { - final Set<SegmentId> segmentsToRefresh = new TreeSet<>(); + final Set<SegmentId> segmentsToRefresh = new TreeSet<>(SEGMENT_ORDER); Review Comment: I think this is not required https://github.com/apache/druid/blob/e43bb74c3af67a8ac2d52a17aee91a47d735c027/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java#L668 Its getting sorted further down the line. ########## server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java: ########## @@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da } } } + + private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds) + { + if (realtimeSegmentSchemaAnnouncement) { + synchronized (lock) { + for (SegmentId segmentId : mutableSegments) { + segmentIds.remove(segmentId); + } + } + } + return segmentIds; + } + + /** + * Update schema for segments. + */ + @VisibleForTesting + void updateSchemaForSegments(SegmentSchemas segmentSchemas) + { + List<SegmentSchemas.SegmentSchema> segmentSchemaList = segmentSchemas.getSegmentSchemaList(); + + for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) { + String dataSource = segmentSchema.getDataSource(); + SegmentId segmentId = SegmentId.tryParse(dataSource, segmentSchema.getSegmentId()); + + log.debug("Applying schema update for segmentId [%s] datasource [%s]", segmentId, dataSource); + + segmentMetadataInfo.compute( + dataSource, + (dataSourceKey, segmentsMap) -> { + if (segmentsMap == null) { + segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER); + } + segmentsMap.compute( + segmentId, + (id, segmentMetadata) -> { + if (segmentMetadata == null) { + // By design, this case shouldn't arise since both segment and schema is announced in the same flow + // and messages shouldn't be lost in the poll + // also segment announcement should always precede schema announcement + // and there shouldn't be any schema updated for removed segments + log.makeAlert("Schema update [%s] for unknown segment [%s]", segmentSchema, segmentId).emit(); + } else { + // We know this segment. + Optional<RowSignature> rowSignature = + mergeOrCreateRowSignature( + segmentId, + segmentMetadata.getRowSignature(), + segmentSchema + ); + if (rowSignature.isPresent()) { + log.debug( + "Segment [%s] signature [%s] after applying schema update.", + segmentId, + rowSignature.get() + ); + // mark the datasource for rebuilding + markDataSourceAsNeedRebuild(dataSource); + + segmentMetadata = AvailableSegmentMetadata + .from(segmentMetadata) + .withRowSignature(rowSignature.get()) + .withNumRows(segmentSchema.getNumRows()) + .build(); + } + } + return segmentMetadata; + } + ); + return segmentsMap; + } + ); + } + } + + /** + * Merge or create a new RowSignature using the existing RowSignature and schema update. + */ + @VisibleForTesting + Optional<RowSignature> mergeOrCreateRowSignature( + SegmentId segmentId, + @Nullable RowSignature existingSignature, + SegmentSchemas.SegmentSchema segmentSchema + ) + { + if (!segmentSchema.isDelta()) { + // absolute schema + // override the existing signature + // this case could arise when the server restarts or counter mismatch between client and server + RowSignature.Builder builder = RowSignature.builder(); + Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap(); + for (String column : segmentSchema.getNewColumns()) { + builder.add(column, columnMapping.get(column)); + } + return Optional.of(ROW_SIGNATURE_INTERNER.intern(builder.build())); + } else if (existingSignature != null) { + // delta update + // merge with the existing signature + RowSignature.Builder builder = RowSignature.builder(); + final Map<String, ColumnType> columnTypes = new LinkedHashMap<>(); Review Comment: Lets rename this to mergedColumnTypes ? ########## server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java: ########## @@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da } } } + + private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds) + { + if (realtimeSegmentSchemaAnnouncement) { + synchronized (lock) { + for (SegmentId segmentId : mutableSegments) { + segmentIds.remove(segmentId); + } + } + } + return segmentIds; + } + + /** + * Update schema for segments. + */ + @VisibleForTesting + void updateSchemaForSegments(SegmentSchemas segmentSchemas) + { + List<SegmentSchemas.SegmentSchema> segmentSchemaList = segmentSchemas.getSegmentSchemaList(); + + for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) { + String dataSource = segmentSchema.getDataSource(); + SegmentId segmentId = SegmentId.tryParse(dataSource, segmentSchema.getSegmentId()); + + log.debug("Applying schema update for segmentId [%s] datasource [%s]", segmentId, dataSource); + + segmentMetadataInfo.compute( + dataSource, + (dataSourceKey, segmentsMap) -> { + if (segmentsMap == null) { + segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER); + } + segmentsMap.compute( + segmentId, + (id, segmentMetadata) -> { + if (segmentMetadata == null) { + // By design, this case shouldn't arise since both segment and schema is announced in the same flow + // and messages shouldn't be lost in the poll + // also segment announcement should always precede schema announcement + // and there shouldn't be any schema updated for removed segments + log.makeAlert("Schema update [%s] for unknown segment [%s]", segmentSchema, segmentId).emit(); + } else { + // We know this segment. + Optional<RowSignature> rowSignature = + mergeOrCreateRowSignature( + segmentId, + segmentMetadata.getRowSignature(), + segmentSchema + ); + if (rowSignature.isPresent()) { + log.debug( + "Segment [%s] signature [%s] after applying schema update.", + segmentId, + rowSignature.get() + ); + // mark the datasource for rebuilding + markDataSourceAsNeedRebuild(dataSource); Review Comment: I think we can mark this datasource needing to rebuild outside this compute ? ########## server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java: ########## @@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da } } } + + private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds) + { + if (realtimeSegmentSchemaAnnouncement) { + synchronized (lock) { + for (SegmentId segmentId : mutableSegments) { + segmentIds.remove(segmentId); + } + } + } + return segmentIds; + } + + /** + * Update schema for segments. + */ + @VisibleForTesting + void updateSchemaForSegments(SegmentSchemas segmentSchemas) + { + List<SegmentSchemas.SegmentSchema> segmentSchemaList = segmentSchemas.getSegmentSchemaList(); + + for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) { + String dataSource = segmentSchema.getDataSource(); + SegmentId segmentId = SegmentId.tryParse(dataSource, segmentSchema.getSegmentId()); + + log.debug("Applying schema update for segmentId [%s] datasource [%s]", segmentId, dataSource); + + segmentMetadataInfo.compute( + dataSource, + (dataSourceKey, segmentsMap) -> { + if (segmentsMap == null) { + segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER); + } + segmentsMap.compute( + segmentId, + (id, segmentMetadata) -> { + if (segmentMetadata == null) { + // By design, this case shouldn't arise since both segment and schema is announced in the same flow + // and messages shouldn't be lost in the poll + // also segment announcement should always precede schema announcement + // and there shouldn't be any schema updated for removed segments + log.makeAlert("Schema update [%s] for unknown segment [%s]", segmentSchema, segmentId).emit(); + } else { + // We know this segment. + Optional<RowSignature> rowSignature = + mergeOrCreateRowSignature( + segmentId, + segmentMetadata.getRowSignature(), + segmentSchema + ); + if (rowSignature.isPresent()) { + log.debug( + "Segment [%s] signature [%s] after applying schema update.", + segmentId, + rowSignature.get() + ); + // mark the datasource for rebuilding + markDataSourceAsNeedRebuild(dataSource); + + segmentMetadata = AvailableSegmentMetadata + .from(segmentMetadata) + .withRowSignature(rowSignature.get()) + .withNumRows(segmentSchema.getNumRows()) + .build(); + } + } + return segmentMetadata; + } + ); + return segmentsMap; + } + ); + } + } + + /** + * Merge or create a new RowSignature using the existing RowSignature and schema update. + */ + @VisibleForTesting + Optional<RowSignature> mergeOrCreateRowSignature( + SegmentId segmentId, + @Nullable RowSignature existingSignature, + SegmentSchemas.SegmentSchema segmentSchema + ) + { + if (!segmentSchema.isDelta()) { + // absolute schema + // override the existing signature + // this case could arise when the server restarts or counter mismatch between client and server + RowSignature.Builder builder = RowSignature.builder(); + Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap(); + for (String column : segmentSchema.getNewColumns()) { + builder.add(column, columnMapping.get(column)); + } + return Optional.of(ROW_SIGNATURE_INTERNER.intern(builder.build())); + } else if (existingSignature != null) { + // delta update + // merge with the existing signature + RowSignature.Builder builder = RowSignature.builder(); + final Map<String, ColumnType> columnTypes = new LinkedHashMap<>(); + + for (String column : existingSignature.getColumnNames()) { + final ColumnType columnType = + existingSignature.getColumnType(column) + .orElseThrow(() -> new ISE("Encountered null type for column [%s]", column)); + + columnTypes.compute(column, (c, existingType) -> columnTypeMergePolicy.merge(existingType, columnType)); + } + + Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap(); + + // column type to be updated is not present in the existing schema + boolean missingUpdateColumns = false; + // new column to be added is already present in the existing schema + boolean existingNewColumns = false; + + for (String column : segmentSchema.getUpdatedColumns()) { Review Comment: I think differentiating between missing , update adds very less value. Lets union both the column lists and add them to the alert ? ########## server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java: ########## @@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da } } } + + private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds) + { + if (realtimeSegmentSchemaAnnouncement) { + synchronized (lock) { + for (SegmentId segmentId : mutableSegments) { + segmentIds.remove(segmentId); + } + } + } + return segmentIds; + } + + /** + * Update schema for segments. + */ + @VisibleForTesting + void updateSchemaForSegments(SegmentSchemas segmentSchemas) + { + List<SegmentSchemas.SegmentSchema> segmentSchemaList = segmentSchemas.getSegmentSchemaList(); + + for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) { + String dataSource = segmentSchema.getDataSource(); + SegmentId segmentId = SegmentId.tryParse(dataSource, segmentSchema.getSegmentId()); + + log.debug("Applying schema update for segmentId [%s] datasource [%s]", segmentId, dataSource); + + segmentMetadataInfo.compute( + dataSource, + (dataSourceKey, segmentsMap) -> { + if (segmentsMap == null) { + segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER); + } + segmentsMap.compute( + segmentId, + (id, segmentMetadata) -> { + if (segmentMetadata == null) { + // By design, this case shouldn't arise since both segment and schema is announced in the same flow + // and messages shouldn't be lost in the poll + // also segment announcement should always precede schema announcement + // and there shouldn't be any schema updated for removed segments + log.makeAlert("Schema update [%s] for unknown segment [%s]", segmentSchema, segmentId).emit(); + } else { + // We know this segment. + Optional<RowSignature> rowSignature = + mergeOrCreateRowSignature( + segmentId, + segmentMetadata.getRowSignature(), + segmentSchema + ); + if (rowSignature.isPresent()) { + log.debug( + "Segment [%s] signature [%s] after applying schema update.", + segmentId, + rowSignature.get() + ); + // mark the datasource for rebuilding + markDataSourceAsNeedRebuild(dataSource); + + segmentMetadata = AvailableSegmentMetadata + .from(segmentMetadata) + .withRowSignature(rowSignature.get()) + .withNumRows(segmentSchema.getNumRows()) + .build(); + } + } + return segmentMetadata; + } + ); + return segmentsMap; + } + ); + } + } + + /** + * Merge or create a new RowSignature using the existing RowSignature and schema update. + */ + @VisibleForTesting + Optional<RowSignature> mergeOrCreateRowSignature( + SegmentId segmentId, + @Nullable RowSignature existingSignature, + SegmentSchemas.SegmentSchema segmentSchema + ) + { + if (!segmentSchema.isDelta()) { + // absolute schema + // override the existing signature + // this case could arise when the server restarts or counter mismatch between client and server + RowSignature.Builder builder = RowSignature.builder(); + Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap(); + for (String column : segmentSchema.getNewColumns()) { + builder.add(column, columnMapping.get(column)); + } + return Optional.of(ROW_SIGNATURE_INTERNER.intern(builder.build())); + } else if (existingSignature != null) { + // delta update + // merge with the existing signature + RowSignature.Builder builder = RowSignature.builder(); + final Map<String, ColumnType> columnTypes = new LinkedHashMap<>(); + + for (String column : existingSignature.getColumnNames()) { + final ColumnType columnType = Review Comment: It seems like existingType is getting merged with existing signature ? no? ########## server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkSchemaUtil.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.realtime.appenderator.SegmentSchemas.SegmentSchema; +import org.apache.druid.timeline.SegmentId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * Utility to compute schema for all sinks in a streaming ingestion task, used in {@link StreamAppenderator}. + */ +class SinkSchemaUtil +{ + /** + * Compute {@link SegmentSchemas} for the sinks. + */ + @VisibleForTesting + static Optional<SegmentSchemas> computeAbsoluteSchema( + Map<SegmentId, Pair<RowSignature, Integer>> sinkSchemaMap + ) + { + List<SegmentSchema> sinkSchemas = new ArrayList<>(); + + for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : sinkSchemaMap.entrySet()) { + SegmentId segmentId = entry.getKey(); + RowSignature sinkSignature = entry.getValue().lhs; + + List<String> newColumns = new ArrayList<>(); + + Map<String, ColumnType> columnMapping = new HashMap<>(); + + // new Sink + for (String column : sinkSignature.getColumnNames()) { + newColumns.add(column); + sinkSignature.getColumnType(column).ifPresent(type -> columnMapping.put(column, type)); + } + + Integer numRows = entry.getValue().rhs; + if (newColumns.size() > 0) { + SegmentSchema segmentSchema = + new SegmentSchema( + segmentId.getDataSource(), + segmentId.toString(), + false, + numRows, + newColumns, + Collections.emptyList(), + columnMapping + ); + sinkSchemas.add(segmentSchema); + } + } + + return Optional.ofNullable(sinkSchemas.isEmpty() ? null : new SegmentSchemas(sinkSchemas)); + } + + /** + * Compute schema change for the sinks. + */ + @VisibleForTesting + static Optional<SegmentSchemas> computeSchemaChange( + Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap, + Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap + ) + { + List<SegmentSchema> sinkSchemas = new ArrayList<>(); + + for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : currentSinkSignatureMap.entrySet()) { + SegmentId segmentId = entry.getKey(); + RowSignature sinkSignature = entry.getValue().lhs; + + Integer numRows = entry.getValue().rhs; + + List<String> newColumns = new ArrayList<>(); + List<String> updatedColumns = new ArrayList<>(); + Map<String, ColumnType> columnMapping = new HashMap<>(); + + // whether there are any changes to be published + boolean publish = false; + // if the resultant schema is delta + boolean delta = false; + + if (!previousSinkSignatureMap.containsKey(segmentId)) { + // new Sink + for (String column : sinkSignature.getColumnNames()) { + newColumns.add(column); + sinkSignature.getColumnType(column).ifPresent(type -> columnMapping.put(column, type)); + } + if (newColumns.size() > 0 || numRows > 0) { + publish = true; + } + } else { + RowSignature previousSinkSignature = previousSinkSignatureMap.get(segmentId).lhs; + Set<String> previousSinkDimensions = new HashSet<>(previousSinkSignature.getColumnNames()); + + Integer previousNumRows = previousSinkSignatureMap.get(segmentId).rhs; + for (String column : sinkSignature.getColumnNames()) { + boolean added = false; + if (!previousSinkDimensions.contains(column)) { + newColumns.add(column); + added = true; + } else if (!Objects.equals(previousSinkSignature.getColumnType(column), sinkSignature.getColumnType(column))) { + updatedColumns.add(column); + added = true; + } + + if (added) { + sinkSignature.getColumnType(column).ifPresent(type -> columnMapping.put(column, type)); Review Comment: ```suggestion sinkSignature.getColumnType(column).ifPresent(type -> currentColumnMapping.put(column, type)); ``` ########## server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java: ########## @@ -1581,4 +1604,74 @@ private int calculateSinkMemoryInUsed(Sink sink) // Rough estimate of memory footprint of empty Sink based on actual heap dumps return ROUGH_OVERHEAD_PER_SINK; } + + /** + * This inner class periodically computes absolute and delta schema for all the {@link StreamAppenderator#sinks} + * and announces them. + */ + @VisibleForTesting + class SinkSchemaAnnouncer + { + private static final long SCHEMA_PUBLISH_DELAY_MILLIS = 0; + private static final long SCHEMA_PUBLISH_PERIOD_MILLIS = 60_000; + + private final DataSegmentAnnouncer announcer; + private final ScheduledExecutorService scheduledExecutorService; + private final String taskId; + private Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap = new HashMap<>(); Review Comment: Please mention that this datastructure is always accessed by `Sink-Schema-Announcer-1` thread hence its okay to be non thread safe. ########## server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkSchemaUtil.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.realtime.appenderator.SegmentSchemas.SegmentSchema; +import org.apache.druid.timeline.SegmentId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * Utility to compute schema for all sinks in a streaming ingestion task, used in {@link StreamAppenderator}. + */ +class SinkSchemaUtil +{ + /** + * Compute {@link SegmentSchemas} for the sinks. + */ + @VisibleForTesting + static Optional<SegmentSchemas> computeAbsoluteSchema( + Map<SegmentId, Pair<RowSignature, Integer>> sinkSchemaMap + ) + { + List<SegmentSchema> sinkSchemas = new ArrayList<>(); + + for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : sinkSchemaMap.entrySet()) { + SegmentId segmentId = entry.getKey(); + RowSignature sinkSignature = entry.getValue().lhs; + + List<String> newColumns = new ArrayList<>(); + + Map<String, ColumnType> columnMapping = new HashMap<>(); + + // new Sink + for (String column : sinkSignature.getColumnNames()) { + newColumns.add(column); + sinkSignature.getColumnType(column).ifPresent(type -> columnMapping.put(column, type)); + } + + Integer numRows = entry.getValue().rhs; + if (newColumns.size() > 0) { + SegmentSchema segmentSchema = + new SegmentSchema( + segmentId.getDataSource(), + segmentId.toString(), + false, + numRows, + newColumns, + Collections.emptyList(), + columnMapping + ); + sinkSchemas.add(segmentSchema); + } + } + + return Optional.ofNullable(sinkSchemas.isEmpty() ? null : new SegmentSchemas(sinkSchemas)); + } + + /** + * Compute schema change for the sinks. + */ + @VisibleForTesting + static Optional<SegmentSchemas> computeSchemaChange( + Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap, + Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap + ) + { + List<SegmentSchema> sinkSchemas = new ArrayList<>(); + + for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : currentSinkSignatureMap.entrySet()) { + SegmentId segmentId = entry.getKey(); + RowSignature sinkSignature = entry.getValue().lhs; Review Comment: ```suggestion RowSignature currentSignature = entry.getValue().lhs; ``` ########## server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkSchemaUtil.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.realtime.appenderator.SegmentSchemas.SegmentSchema; +import org.apache.druid.timeline.SegmentId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * Utility to compute schema for all sinks in a streaming ingestion task, used in {@link StreamAppenderator}. + */ +class SinkSchemaUtil +{ + /** + * Compute {@link SegmentSchemas} for the sinks. + */ + @VisibleForTesting + static Optional<SegmentSchemas> computeAbsoluteSchema( + Map<SegmentId, Pair<RowSignature, Integer>> sinkSchemaMap + ) + { + List<SegmentSchema> sinkSchemas = new ArrayList<>(); + + for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : sinkSchemaMap.entrySet()) { + SegmentId segmentId = entry.getKey(); + RowSignature sinkSignature = entry.getValue().lhs; + + List<String> newColumns = new ArrayList<>(); + + Map<String, ColumnType> columnMapping = new HashMap<>(); + + // new Sink + for (String column : sinkSignature.getColumnNames()) { + newColumns.add(column); + sinkSignature.getColumnType(column).ifPresent(type -> columnMapping.put(column, type)); + } + + Integer numRows = entry.getValue().rhs; + if (newColumns.size() > 0) { + SegmentSchema segmentSchema = + new SegmentSchema( + segmentId.getDataSource(), + segmentId.toString(), + false, + numRows, + newColumns, + Collections.emptyList(), + columnMapping + ); + sinkSchemas.add(segmentSchema); + } + } + + return Optional.ofNullable(sinkSchemas.isEmpty() ? null : new SegmentSchemas(sinkSchemas)); + } + + /** + * Compute schema change for the sinks. + */ + @VisibleForTesting + static Optional<SegmentSchemas> computeSchemaChange( + Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap, + Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap + ) + { + List<SegmentSchema> sinkSchemas = new ArrayList<>(); + + for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : currentSinkSignatureMap.entrySet()) { + SegmentId segmentId = entry.getKey(); + RowSignature sinkSignature = entry.getValue().lhs; + + Integer numRows = entry.getValue().rhs; + + List<String> newColumns = new ArrayList<>(); + List<String> updatedColumns = new ArrayList<>(); + Map<String, ColumnType> columnMapping = new HashMap<>(); + + // whether there are any changes to be published + boolean publish = false; + // if the resultant schema is delta + boolean delta = false; + + if (!previousSinkSignatureMap.containsKey(segmentId)) { + // new Sink + for (String column : sinkSignature.getColumnNames()) { + newColumns.add(column); + sinkSignature.getColumnType(column).ifPresent(type -> columnMapping.put(column, type)); + } + if (newColumns.size() > 0 || numRows > 0) { Review Comment: ```suggestion if (newColumns.size() > 0) { ``` I think we should only publish when we have newCols. ########## server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkSchemaUtil.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.realtime.appenderator.SegmentSchemas.SegmentSchema; +import org.apache.druid.timeline.SegmentId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * Utility to compute schema for all sinks in a streaming ingestion task, used in {@link StreamAppenderator}. + */ +class SinkSchemaUtil +{ + /** + * Compute {@link SegmentSchemas} for the sinks. + */ + @VisibleForTesting + static Optional<SegmentSchemas> computeAbsoluteSchema( + Map<SegmentId, Pair<RowSignature, Integer>> sinkSchemaMap + ) + { + List<SegmentSchema> sinkSchemas = new ArrayList<>(); + + for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : sinkSchemaMap.entrySet()) { + SegmentId segmentId = entry.getKey(); + RowSignature sinkSignature = entry.getValue().lhs; + + List<String> newColumns = new ArrayList<>(); + + Map<String, ColumnType> columnMapping = new HashMap<>(); + + // new Sink + for (String column : sinkSignature.getColumnNames()) { + newColumns.add(column); + sinkSignature.getColumnType(column).ifPresent(type -> columnMapping.put(column, type)); + } + + Integer numRows = entry.getValue().rhs; + if (newColumns.size() > 0) { + SegmentSchema segmentSchema = + new SegmentSchema( + segmentId.getDataSource(), + segmentId.toString(), + false, + numRows, + newColumns, + Collections.emptyList(), + columnMapping + ); + sinkSchemas.add(segmentSchema); + } + } + + return Optional.ofNullable(sinkSchemas.isEmpty() ? null : new SegmentSchemas(sinkSchemas)); + } + + /** + * Compute schema change for the sinks. + */ + @VisibleForTesting + static Optional<SegmentSchemas> computeSchemaChange( + Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap, + Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap + ) + { + List<SegmentSchema> sinkSchemas = new ArrayList<>(); + + for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : currentSinkSignatureMap.entrySet()) { + SegmentId segmentId = entry.getKey(); + RowSignature sinkSignature = entry.getValue().lhs; + + Integer numRows = entry.getValue().rhs; + + List<String> newColumns = new ArrayList<>(); + List<String> updatedColumns = new ArrayList<>(); + Map<String, ColumnType> columnMapping = new HashMap<>(); + + // whether there are any changes to be published + boolean publish = false; Review Comment: ```suggestion boolean toPublish = false; ``` ########## server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkSchemaUtil.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.realtime.appenderator.SegmentSchemas.SegmentSchema; +import org.apache.druid.timeline.SegmentId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * Utility to compute schema for all sinks in a streaming ingestion task, used in {@link StreamAppenderator}. + */ +class SinkSchemaUtil +{ + /** + * Compute {@link SegmentSchemas} for the sinks. + */ + @VisibleForTesting + static Optional<SegmentSchemas> computeAbsoluteSchema( + Map<SegmentId, Pair<RowSignature, Integer>> sinkSchemaMap + ) + { + List<SegmentSchema> sinkSchemas = new ArrayList<>(); + + for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : sinkSchemaMap.entrySet()) { + SegmentId segmentId = entry.getKey(); + RowSignature sinkSignature = entry.getValue().lhs; + + List<String> newColumns = new ArrayList<>(); + + Map<String, ColumnType> columnMapping = new HashMap<>(); + + // new Sink + for (String column : sinkSignature.getColumnNames()) { + newColumns.add(column); + sinkSignature.getColumnType(column).ifPresent(type -> columnMapping.put(column, type)); + } + + Integer numRows = entry.getValue().rhs; + if (newColumns.size() > 0) { + SegmentSchema segmentSchema = + new SegmentSchema( + segmentId.getDataSource(), + segmentId.toString(), + false, + numRows, + newColumns, + Collections.emptyList(), + columnMapping + ); + sinkSchemas.add(segmentSchema); + } + } + + return Optional.ofNullable(sinkSchemas.isEmpty() ? null : new SegmentSchemas(sinkSchemas)); + } + + /** + * Compute schema change for the sinks. + */ + @VisibleForTesting + static Optional<SegmentSchemas> computeSchemaChange( + Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap, + Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap + ) + { + List<SegmentSchema> sinkSchemas = new ArrayList<>(); + + for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : currentSinkSignatureMap.entrySet()) { + SegmentId segmentId = entry.getKey(); + RowSignature sinkSignature = entry.getValue().lhs; + + Integer numRows = entry.getValue().rhs; + + List<String> newColumns = new ArrayList<>(); + List<String> updatedColumns = new ArrayList<>(); + Map<String, ColumnType> columnMapping = new HashMap<>(); + + // whether there are any changes to be published + boolean publish = false; + // if the resultant schema is delta + boolean delta = false; Review Comment: ```suggestion boolean isDelta = false; ``` ########## server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java: ########## @@ -82,7 +88,12 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink> private final long maxBytesInMemory; private final boolean useMaxMemoryEstimates; private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<>(); + private final LinkedHashSet<String> dimOrder = new LinkedHashSet<>(); + // columns excluding current index, includes __time column Review Comment: what is curIndex, the inmemory Fire hydrant ? If yes lets rename it like that / or document it ########## server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkSchemaUtil.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.realtime.appenderator.SegmentSchemas.SegmentSchema; +import org.apache.druid.timeline.SegmentId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * Utility to compute schema for all sinks in a streaming ingestion task, used in {@link StreamAppenderator}. + */ +class SinkSchemaUtil +{ + /** + * Compute {@link SegmentSchemas} for the sinks. + */ + @VisibleForTesting + static Optional<SegmentSchemas> computeAbsoluteSchema( + Map<SegmentId, Pair<RowSignature, Integer>> sinkSchemaMap + ) + { + List<SegmentSchema> sinkSchemas = new ArrayList<>(); + + for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : sinkSchemaMap.entrySet()) { + SegmentId segmentId = entry.getKey(); + RowSignature sinkSignature = entry.getValue().lhs; + + List<String> newColumns = new ArrayList<>(); + + Map<String, ColumnType> columnMapping = new HashMap<>(); + + // new Sink + for (String column : sinkSignature.getColumnNames()) { + newColumns.add(column); + sinkSignature.getColumnType(column).ifPresent(type -> columnMapping.put(column, type)); + } + + Integer numRows = entry.getValue().rhs; + if (newColumns.size() > 0) { + SegmentSchema segmentSchema = + new SegmentSchema( + segmentId.getDataSource(), + segmentId.toString(), + false, + numRows, + newColumns, + Collections.emptyList(), + columnMapping + ); + sinkSchemas.add(segmentSchema); + } + } + + return Optional.ofNullable(sinkSchemas.isEmpty() ? null : new SegmentSchemas(sinkSchemas)); + } + + /** + * Compute schema change for the sinks. + */ + @VisibleForTesting + static Optional<SegmentSchemas> computeSchemaChange( + Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap, + Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap + ) + { + List<SegmentSchema> sinkSchemas = new ArrayList<>(); + + for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : currentSinkSignatureMap.entrySet()) { + SegmentId segmentId = entry.getKey(); + RowSignature sinkSignature = entry.getValue().lhs; + + Integer numRows = entry.getValue().rhs; + + List<String> newColumns = new ArrayList<>(); + List<String> updatedColumns = new ArrayList<>(); + Map<String, ColumnType> columnMapping = new HashMap<>(); + + // whether there are any changes to be published + boolean publish = false; + // if the resultant schema is delta + boolean delta = false; + + if (!previousSinkSignatureMap.containsKey(segmentId)) { + // new Sink + for (String column : sinkSignature.getColumnNames()) { + newColumns.add(column); + sinkSignature.getColumnType(column).ifPresent(type -> columnMapping.put(column, type)); + } + if (newColumns.size() > 0 || numRows > 0) { + publish = true; + } + } else { + RowSignature previousSinkSignature = previousSinkSignatureMap.get(segmentId).lhs; + Set<String> previousSinkDimensions = new HashSet<>(previousSinkSignature.getColumnNames()); + + Integer previousNumRows = previousSinkSignatureMap.get(segmentId).rhs; + for (String column : sinkSignature.getColumnNames()) { + boolean added = false; + if (!previousSinkDimensions.contains(column)) { + newColumns.add(column); + added = true; + } else if (!Objects.equals(previousSinkSignature.getColumnType(column), sinkSignature.getColumnType(column))) { + updatedColumns.add(column); + added = true; + } + + if (added) { + sinkSignature.getColumnType(column).ifPresent(type -> columnMapping.put(column, type)); + } + } + + if ((!Objects.equals(numRows, previousNumRows)) || (updatedColumns.size() > 0) || (newColumns.size() > 0)) { + publish = true; + delta = true; + } + } + + if (publish) { + SegmentSchema segmentSchema = + new SegmentSchema( + segmentId.getDataSource(), + segmentId.toString(), + delta, Review Comment: ```suggestion true, ``` ########## server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java: ########## @@ -420,6 +442,51 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) return old; } + /** + * Merge the column from the index with the existing columns. + */ + private void mergeIndexDimensions(StorageAdapter storageAdapter) Review Comment: ```suggestion private void overWriteIndexDimensions(StorageAdapter storageAdapter) ``` ########## server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java: ########## @@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da } } } + + private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds) + { + if (realtimeSegmentSchemaAnnouncement) { + synchronized (lock) { + for (SegmentId segmentId : mutableSegments) { + segmentIds.remove(segmentId); + } + } + } + return segmentIds; + } + + /** + * Update schema for segments. + */ + @VisibleForTesting + void updateSchemaForSegments(SegmentSchemas segmentSchemas) + { + List<SegmentSchemas.SegmentSchema> segmentSchemaList = segmentSchemas.getSegmentSchemaList(); + + for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) { + String dataSource = segmentSchema.getDataSource(); + SegmentId segmentId = SegmentId.tryParse(dataSource, segmentSchema.getSegmentId()); + + log.debug("Applying schema update for segmentId [%s] datasource [%s]", segmentId, dataSource); + + segmentMetadataInfo.compute( + dataSource, + (dataSourceKey, segmentsMap) -> { + if (segmentsMap == null) { + segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER); + } + segmentsMap.compute( + segmentId, + (id, segmentMetadata) -> { + if (segmentMetadata == null) { + // By design, this case shouldn't arise since both segment and schema is announced in the same flow + // and messages shouldn't be lost in the poll + // also segment announcement should always precede schema announcement + // and there shouldn't be any schema updated for removed segments + log.makeAlert("Schema update [%s] for unknown segment [%s]", segmentSchema, segmentId).emit(); + } else { + // We know this segment. + Optional<RowSignature> rowSignature = + mergeOrCreateRowSignature( + segmentId, + segmentMetadata.getRowSignature(), + segmentSchema + ); + if (rowSignature.isPresent()) { + log.debug( + "Segment [%s] signature [%s] after applying schema update.", + segmentId, + rowSignature.get() + ); + // mark the datasource for rebuilding + markDataSourceAsNeedRebuild(dataSource); + + segmentMetadata = AvailableSegmentMetadata + .from(segmentMetadata) + .withRowSignature(rowSignature.get()) + .withNumRows(segmentSchema.getNumRows()) + .build(); + } + } + return segmentMetadata; + } + ); + return segmentsMap; + } + ); + } + } + + /** + * Merge or create a new RowSignature using the existing RowSignature and schema update. + */ + @VisibleForTesting + Optional<RowSignature> mergeOrCreateRowSignature( + SegmentId segmentId, + @Nullable RowSignature existingSignature, + SegmentSchemas.SegmentSchema segmentSchema + ) + { + if (!segmentSchema.isDelta()) { + // absolute schema + // override the existing signature + // this case could arise when the server restarts or counter mismatch between client and server + RowSignature.Builder builder = RowSignature.builder(); + Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap(); + for (String column : segmentSchema.getNewColumns()) { + builder.add(column, columnMapping.get(column)); + } + return Optional.of(ROW_SIGNATURE_INTERNER.intern(builder.build())); + } else if (existingSignature != null) { + // delta update + // merge with the existing signature + RowSignature.Builder builder = RowSignature.builder(); + final Map<String, ColumnType> columnTypes = new LinkedHashMap<>(); + + for (String column : existingSignature.getColumnNames()) { + final ColumnType columnType = Review Comment: We could just iterate over the existingSignature and populate the `columnTypes` map no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
