cryptoe commented on code in PR #15475:
URL: https://github.com/apache/druid/pull/15475#discussion_r1426459057


##########
server/src/main/java/org/apache/druid/server/coordination/DataSegmentAnnouncer.java:
##########
@@ -32,4 +34,23 @@ public interface DataSegmentAnnouncer
   void announceSegments(Iterable<DataSegment> segments) throws IOException;
 
   void unannounceSegments(Iterable<DataSegment> segments) throws IOException;
+
+  /**
+   * Announce schema for all sinks for a given task.
+   *
+   * @param taskId taskId
+   * @param segmentSchemas absolute schema for all sinks, in case the client 
requests full sync.
+   * @param segmentSchemasChange schema change for all sinks
+   */
+  void announceSinkSchemaForTask(

Review Comment:
   Why the word sink in the interface definition 
   anounceSegmentSchema() ?
   You can mention in the JavaDoc that this is only for realtime segments 



##########
server/src/main/java/org/apache/druid/segment/metadata/CentralizedDatasourceSchemaConfig.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Config for centralizing datasource schema management in Coordinator.
+ */
+public class CentralizedDatasourceSchemaConfig
+{
+  @JsonProperty
+  private boolean enabled = false;
+
+  // If realtime segment schema should be published in segment announcement 
flow
+  @JsonProperty
+  private boolean announceRealtimeSegmentSchema = false;

Review Comment:
   Can you mention in a comment that this property is temp property and will be 
removed in future released. 



##########
server/src/main/java/org/apache/druid/client/CoordinatorServerView.java:
##########
@@ -114,6 +115,13 @@ public ServerView.CallbackAction segmentViewInitialized()
             runTimelineCallbacks(TimelineCallback::timelineInitialized);
             return ServerView.CallbackAction.CONTINUE;
           }
+
+          @Override
+          public ServerView.CallbackAction 
segmentSchemasAnnounced(SegmentSchemas segmentSchemas)
+          {
+            runTimelineCallbacks(callback -> 
callback.segmentSchemasAnnounced(segmentSchemas));

Review Comment:
   Why are listeners of the timeline waiting for an event which does not touch 
the timeline at all. 



##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -102,6 +119,14 @@ public ServerView.CallbackAction serverSegmentRemoved(
             removeServerSegment(server, segment);
             return ServerView.CallbackAction.CONTINUE;
           }
+
+          @Override
+          public ServerView.CallbackAction 
segmentSchemasAnnounced(SegmentSchemas segmentSchemas)
+          {
+            log.debug("SegmentSchemas [%s]", segmentSchemas);

Review Comment:
   Lets block this flow when `realtimeSegmentSchemaAnnouncement` is disabled. 



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentSchemas.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.segment.column.ColumnType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Encapsulates schema information for multiple segments.
+ * <p>
+ * Primarily used to announce schema changes for all {@link 
org.apache.druid.segment.realtime.plumber.Sink}
+ * created by a task in {@link StreamAppenderator}.
+ */
+public class SegmentSchemas
+{
+  private final List<SegmentSchema> segmentSchemaList;
+
+  @JsonCreator
+  public SegmentSchemas(
+      @JsonProperty("segmentSchemaList") List<SegmentSchema> segmentSchemaList
+  )
+  {
+    this.segmentSchemaList = segmentSchemaList;
+  }
+
+  @JsonProperty
+  public List<SegmentSchema> getSegmentSchemaList()
+  {
+    return segmentSchemaList;
+  }
+
+  /**
+   * Encapsulates either the absolute schema or schema change for a segment.
+   */
+  public static class SegmentSchema
+  {
+    private final String dataSource;
+    private final String segmentId;
+
+    // represents whether it is a schema change or absolute schema
+    private final boolean delta;
+
+    // absolute number of rows in the segment
+    private final Integer numRows;
+
+    // new columns in the segment
+    private final List<String> newColumns;
+
+    // updated column types, empty for absolute segment schema
+    private final List<String> updatedColumns;
+
+    // all column should have non-null types
+    private final Map<String, ColumnType> columnTypeMap;
+
+    @JsonCreator
+    public SegmentSchema(
+        @JsonProperty("dataSource") String dataSource,
+        @JsonProperty("segmentId") String segmentId,
+        @JsonProperty("delta") boolean delta,

Review Comment:
   and delta too ?



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentSchemas.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.segment.column.ColumnType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Encapsulates schema information for multiple segments.
+ * <p>
+ * Primarily used to announce schema changes for all {@link 
org.apache.druid.segment.realtime.plumber.Sink}
+ * created by a task in {@link StreamAppenderator}.
+ */
+public class SegmentSchemas
+{
+  private final List<SegmentSchema> segmentSchemaList;
+
+  @JsonCreator
+  public SegmentSchemas(
+      @JsonProperty("segmentSchemaList") List<SegmentSchema> segmentSchemaList
+  )
+  {
+    this.segmentSchemaList = segmentSchemaList;
+  }
+
+  @JsonProperty
+  public List<SegmentSchema> getSegmentSchemaList()
+  {
+    return segmentSchemaList;
+  }
+
+  /**
+   * Encapsulates either the absolute schema or schema change for a segment.
+   */
+  public static class SegmentSchema
+  {
+    private final String dataSource;
+    private final String segmentId;
+
+    // represents whether it is a schema change or absolute schema
+    private final boolean delta;
+
+    // absolute number of rows in the segment
+    private final Integer numRows;
+
+    // new columns in the segment
+    private final List<String> newColumns;
+
+    // updated column types, empty for absolute segment schema
+    private final List<String> updatedColumns;
+
+    // all column should have non-null types
+    private final Map<String, ColumnType> columnTypeMap;
+
+    @JsonCreator
+    public SegmentSchema(
+        @JsonProperty("dataSource") String dataSource,
+        @JsonProperty("segmentId") String segmentId,
+        @JsonProperty("delta") boolean delta,
+        @JsonProperty("numRows") Integer numRows,
+        @JsonProperty("newColumns") List<String> newColumns,
+        @JsonProperty("updatedColumns") List<String> updatedColumns,

Review Comment:
   Shoudn't this be nullable ?



##########
server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java:
##########
@@ -148,7 +159,9 @@ public Sink(
       maxCount = hydrant.getCount();
       ReferenceCountingSegment segment = hydrant.getIncrementedSegment();
       try {
-        
numRowsExcludingCurrIndex.addAndGet(segment.asQueryableIndex().getNumRows());
+        QueryableIndex index = segment.asQueryableIndex();
+        mergeIndexDimensions(new QueryableIndexStorageAdapter(index));
+        numRowsExcludingCurrIndex.addAndGet(index.getNumRows());

Review Comment:
   This is only for restoring tasks rite ?



##########
server/src/main/java/org/apache/druid/client/CoordinatorServerView.java:
##########
@@ -114,6 +115,13 @@ public ServerView.CallbackAction segmentViewInitialized()
             runTimelineCallbacks(TimelineCallback::timelineInitialized);
             return ServerView.CallbackAction.CONTINUE;
           }
+
+          @Override
+          public ServerView.CallbackAction 
segmentSchemasAnnounced(SegmentSchemas segmentSchemas)
+          {
+            runTimelineCallbacks(callback -> 
callback.segmentSchemasAnnounced(segmentSchemas));

Review Comment:
   Can you also update the description of the PR which tells us how the data 
flows between each of these components. 



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkSchemaUtil.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import 
org.apache.druid.segment.realtime.appenderator.SegmentSchemas.SegmentSchema;
+import org.apache.druid.timeline.SegmentId;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Utility to compute schema for all sinks in a streaming ingestion task, used 
in {@link StreamAppenderator}.
+ */
+class SinkSchemaUtil
+{
+  /**
+   * Compute {@link SegmentSchemas} for the sinks.
+   */
+  @VisibleForTesting
+  static Optional<SegmentSchemas> computeAbsoluteSchema(
+      Map<SegmentId, Pair<RowSignature, Integer>> sinkSchemaMap
+  )
+  {
+    List<SegmentSchema> sinkSchemas = new ArrayList<>();
+
+    for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : 
sinkSchemaMap.entrySet()) {
+      SegmentId segmentId = entry.getKey();
+      RowSignature sinkSignature = entry.getValue().lhs;
+
+      List<String> newColumns = new ArrayList<>();
+
+      Map<String, ColumnType> columnMapping = new HashMap<>();
+
+      // new Sink
+      for (String column : sinkSignature.getColumnNames()) {
+        newColumns.add(column);
+        sinkSignature.getColumnType(column).ifPresent(type -> 
columnMapping.put(column, type));
+      }
+
+      Integer numRows = entry.getValue().rhs;
+      if (newColumns.size() > 0) {
+        SegmentSchema segmentSchema =
+            new SegmentSchema(
+                segmentId.getDataSource(),
+                segmentId.toString(),
+                false,
+                numRows,
+                newColumns,
+                Collections.emptyList(),
+                columnMapping
+            );
+        sinkSchemas.add(segmentSchema);
+      }
+    }
+
+    return Optional.ofNullable(sinkSchemas.isEmpty() ? null : new 
SegmentSchemas(sinkSchemas));
+  }
+
+  /**
+   * Compute schema change for the sinks.
+   */
+  @VisibleForTesting
+  static Optional<SegmentSchemas> computeSchemaChange(
+      Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap,
+      Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap
+  )
+  {
+    List<SegmentSchema> sinkSchemas = new ArrayList<>();
+
+    for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : 
currentSinkSignatureMap.entrySet()) {
+      SegmentId segmentId = entry.getKey();
+      RowSignature sinkSignature = entry.getValue().lhs;
+
+      Integer numRows = entry.getValue().rhs;
+
+      List<String> newColumns = new ArrayList<>();
+      List<String> updatedColumns = new ArrayList<>();
+      Map<String, ColumnType> columnMapping = new HashMap<>();
+
+      // whether there are any changes to be published
+      boolean publish = false;
+      // if the resultant schema is delta
+      boolean delta = false;
+
+      if (!previousSinkSignatureMap.containsKey(segmentId)) {
+        // new Sink
+        for (String column : sinkSignature.getColumnNames()) {
+          newColumns.add(column);
+          sinkSignature.getColumnType(column).ifPresent(type -> 
columnMapping.put(column, type));
+        }
+        if (newColumns.size() > 0 || numRows > 0) {
+          publish = true;
+        }
+      } else {
+        RowSignature previousSinkSignature = 
previousSinkSignatureMap.get(segmentId).lhs;
+        Set<String> previousSinkDimensions = new 
HashSet<>(previousSinkSignature.getColumnNames());
+
+        Integer previousNumRows = previousSinkSignatureMap.get(segmentId).rhs;
+        for (String column : sinkSignature.getColumnNames()) {
+          boolean added = false;
+          if (!previousSinkDimensions.contains(column)) {
+            newColumns.add(column);
+            added = true;
+          } else if 
(!Objects.equals(previousSinkSignature.getColumnType(column), 
sinkSignature.getColumnType(column))) {
+            updatedColumns.add(column);
+            added = true;
+          }
+
+          if (added) {
+            sinkSignature.getColumnType(column).ifPresent(type -> 
columnMapping.put(column, type));
+          }
+        }
+
+        if ((!Objects.equals(numRows, previousNumRows)) || 
(updatedColumns.size() > 0) || (newColumns.size() > 0)) {
+          publish = true;
+          delta = true;

Review Comment:
   ```suggestion
   ```



##########
server/src/main/java/org/apache/druid/client/TimelineServerView.java:
##########
@@ -101,5 +102,12 @@ interface TimelineCallback
      * @return continue or unregister
      */
     CallbackAction serverSegmentRemoved(DruidServerMetadata server, 
DataSegment segment);
+
+    /**
+     * Called when segment schema is announced.

Review Comment:
   Lets document the nuances of this flow and why was it done in such a way. 



##########
server/src/main/java/org/apache/druid/server/coordination/DataSegmentAnnouncer.java:
##########
@@ -32,4 +34,23 @@ public interface DataSegmentAnnouncer
   void announceSegments(Iterable<DataSegment> segments) throws IOException;
 
   void unannounceSegments(Iterable<DataSegment> segments) throws IOException;
+
+  /**
+   * Announce schema for all sinks for a given task.
+   *
+   * @param taskId taskId
+   * @param segmentSchemas absolute schema for all sinks, in case the client 
requests full sync.
+   * @param segmentSchemasChange schema change for all sinks
+   */
+  void announceSinkSchemaForTask(
+      String taskId,
+      SegmentSchemas segmentSchemas,
+      @Nullable SegmentSchemas segmentSchemasChange
+  );
+
+  /**
+   * Unnannounce task.
+   * @param taskId taskId
+   */
+  void unannouceTask(String taskId);

Review Comment:
   This method does not make sense in this interface. Are you looking for a 
method to invalidate all the schemasForATask?
   Please add java docs mentioning what's the intended use of this method. 
   



##########
server/src/main/java/org/apache/druid/client/BrokerServerView.java:
##########
@@ -144,6 +145,12 @@ public CallbackAction segmentViewInitialized()
             runTimelineCallbacks(TimelineCallback::timelineInitialized);
             return ServerView.CallbackAction.CONTINUE;
           }
+
+          @Override
+          public CallbackAction segmentSchemasAnnounced(SegmentSchemas 
segmentSchemas)
+          {
+            return CallbackAction.CONTINUE;

Review Comment:
   What about `CallBackAction.Unregister` here ?



##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> 
segmentsToRefresh, final Set<String> da
       }
     }
   }
+
+  private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds)
+  {
+    if (realtimeSegmentSchemaAnnouncement) {
+      synchronized (lock) {
+        for (SegmentId segmentId : mutableSegments) {
+          segmentIds.remove(segmentId);
+        }
+      }
+    }
+    return segmentIds;
+  }
+
+  /**
+   * Update schema for segments.
+   */
+  @VisibleForTesting
+  void updateSchemaForSegments(SegmentSchemas segmentSchemas)
+  {
+    List<SegmentSchemas.SegmentSchema> segmentSchemaList = 
segmentSchemas.getSegmentSchemaList();
+
+    for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) {
+      String dataSource = segmentSchema.getDataSource();
+      SegmentId segmentId = SegmentId.tryParse(dataSource, 
segmentSchema.getSegmentId());

Review Comment:
   We would need a null check here. If null lets skip it by info logging, 



##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> 
segmentsToRefresh, final Set<String> da
       }
     }
   }
+
+  private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds)
+  {
+    if (realtimeSegmentSchemaAnnouncement) {
+      synchronized (lock) {
+        for (SegmentId segmentId : mutableSegments) {

Review Comment:
   Lets do `segmentsIds.removeAll()`



##########
server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java:
##########
@@ -252,7 +253,7 @@ private void startCacheExec()
 
           try {
             while (!Thread.currentThread().isInterrupted()) {
-              final Set<SegmentId> segmentsToRefresh = new TreeSet<>();
+              final Set<SegmentId> segmentsToRefresh = new 
TreeSet<>(SEGMENT_ORDER);

Review Comment:
   I think this is not required 
https://github.com/apache/druid/blob/e43bb74c3af67a8ac2d52a17aee91a47d735c027/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java#L668
 
   Its getting sorted further down the line. 



##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> 
segmentsToRefresh, final Set<String> da
       }
     }
   }
+
+  private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds)
+  {
+    if (realtimeSegmentSchemaAnnouncement) {
+      synchronized (lock) {
+        for (SegmentId segmentId : mutableSegments) {
+          segmentIds.remove(segmentId);
+        }
+      }
+    }
+    return segmentIds;
+  }
+
+  /**
+   * Update schema for segments.
+   */
+  @VisibleForTesting
+  void updateSchemaForSegments(SegmentSchemas segmentSchemas)
+  {
+    List<SegmentSchemas.SegmentSchema> segmentSchemaList = 
segmentSchemas.getSegmentSchemaList();
+
+    for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) {
+      String dataSource = segmentSchema.getDataSource();
+      SegmentId segmentId = SegmentId.tryParse(dataSource, 
segmentSchema.getSegmentId());
+
+      log.debug("Applying schema update for segmentId [%s] datasource [%s]", 
segmentId, dataSource);
+
+      segmentMetadataInfo.compute(
+          dataSource,
+          (dataSourceKey, segmentsMap) -> {
+            if (segmentsMap == null) {
+              segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER);
+            }
+            segmentsMap.compute(
+                segmentId,
+                (id, segmentMetadata) -> {
+                  if (segmentMetadata == null) {
+                    // By design, this case shouldn't arise since both segment 
and schema is announced in the same flow
+                    // and messages shouldn't be lost in the poll
+                    // also segment announcement should always precede schema 
announcement
+                    // and there shouldn't be any schema updated for removed 
segments
+                    log.makeAlert("Schema update [%s] for unknown segment 
[%s]", segmentSchema, segmentId).emit();
+                  } else {
+                    // We know this segment.
+                    Optional<RowSignature> rowSignature =
+                        mergeOrCreateRowSignature(
+                            segmentId,
+                            segmentMetadata.getRowSignature(),
+                            segmentSchema
+                        );
+                    if (rowSignature.isPresent()) {
+                      log.debug(
+                          "Segment [%s] signature [%s] after applying schema 
update.",
+                          segmentId,
+                          rowSignature.get()
+                      );
+                      // mark the datasource for rebuilding
+                      markDataSourceAsNeedRebuild(dataSource);
+
+                      segmentMetadata = AvailableSegmentMetadata
+                          .from(segmentMetadata)
+                          .withRowSignature(rowSignature.get())
+                          .withNumRows(segmentSchema.getNumRows())
+                          .build();
+                    }
+                  }
+                  return segmentMetadata;
+                }
+            );
+            return segmentsMap;
+          }
+      );
+    }
+  }
+
+  /**
+   * Merge or create a new RowSignature using the existing RowSignature and 
schema update.
+   */
+  @VisibleForTesting
+  Optional<RowSignature> mergeOrCreateRowSignature(
+      SegmentId segmentId,
+      @Nullable RowSignature existingSignature,
+      SegmentSchemas.SegmentSchema segmentSchema
+  )
+  {
+    if (!segmentSchema.isDelta()) {
+      // absolute schema
+      // override the existing signature
+      // this case could arise when the server restarts or counter mismatch 
between client and server
+      RowSignature.Builder builder = RowSignature.builder();
+      Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap();
+      for (String column : segmentSchema.getNewColumns()) {
+        builder.add(column, columnMapping.get(column));
+      }
+      return Optional.of(ROW_SIGNATURE_INTERNER.intern(builder.build()));
+    } else if (existingSignature != null) {
+      // delta update
+      // merge with the existing signature
+      RowSignature.Builder builder = RowSignature.builder();
+      final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();

Review Comment:
   Lets rename this to mergedColumnTypes ?



##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> 
segmentsToRefresh, final Set<String> da
       }
     }
   }
+
+  private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds)
+  {
+    if (realtimeSegmentSchemaAnnouncement) {
+      synchronized (lock) {
+        for (SegmentId segmentId : mutableSegments) {
+          segmentIds.remove(segmentId);
+        }
+      }
+    }
+    return segmentIds;
+  }
+
+  /**
+   * Update schema for segments.
+   */
+  @VisibleForTesting
+  void updateSchemaForSegments(SegmentSchemas segmentSchemas)
+  {
+    List<SegmentSchemas.SegmentSchema> segmentSchemaList = 
segmentSchemas.getSegmentSchemaList();
+
+    for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) {
+      String dataSource = segmentSchema.getDataSource();
+      SegmentId segmentId = SegmentId.tryParse(dataSource, 
segmentSchema.getSegmentId());
+
+      log.debug("Applying schema update for segmentId [%s] datasource [%s]", 
segmentId, dataSource);
+
+      segmentMetadataInfo.compute(
+          dataSource,
+          (dataSourceKey, segmentsMap) -> {
+            if (segmentsMap == null) {
+              segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER);
+            }
+            segmentsMap.compute(
+                segmentId,
+                (id, segmentMetadata) -> {
+                  if (segmentMetadata == null) {
+                    // By design, this case shouldn't arise since both segment 
and schema is announced in the same flow
+                    // and messages shouldn't be lost in the poll
+                    // also segment announcement should always precede schema 
announcement
+                    // and there shouldn't be any schema updated for removed 
segments
+                    log.makeAlert("Schema update [%s] for unknown segment 
[%s]", segmentSchema, segmentId).emit();
+                  } else {
+                    // We know this segment.
+                    Optional<RowSignature> rowSignature =
+                        mergeOrCreateRowSignature(
+                            segmentId,
+                            segmentMetadata.getRowSignature(),
+                            segmentSchema
+                        );
+                    if (rowSignature.isPresent()) {
+                      log.debug(
+                          "Segment [%s] signature [%s] after applying schema 
update.",
+                          segmentId,
+                          rowSignature.get()
+                      );
+                      // mark the datasource for rebuilding
+                      markDataSourceAsNeedRebuild(dataSource);

Review Comment:
   I think we can mark this datasource needing to rebuild outside this compute ?



##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> 
segmentsToRefresh, final Set<String> da
       }
     }
   }
+
+  private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds)
+  {
+    if (realtimeSegmentSchemaAnnouncement) {
+      synchronized (lock) {
+        for (SegmentId segmentId : mutableSegments) {
+          segmentIds.remove(segmentId);
+        }
+      }
+    }
+    return segmentIds;
+  }
+
+  /**
+   * Update schema for segments.
+   */
+  @VisibleForTesting
+  void updateSchemaForSegments(SegmentSchemas segmentSchemas)
+  {
+    List<SegmentSchemas.SegmentSchema> segmentSchemaList = 
segmentSchemas.getSegmentSchemaList();
+
+    for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) {
+      String dataSource = segmentSchema.getDataSource();
+      SegmentId segmentId = SegmentId.tryParse(dataSource, 
segmentSchema.getSegmentId());
+
+      log.debug("Applying schema update for segmentId [%s] datasource [%s]", 
segmentId, dataSource);
+
+      segmentMetadataInfo.compute(
+          dataSource,
+          (dataSourceKey, segmentsMap) -> {
+            if (segmentsMap == null) {
+              segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER);
+            }
+            segmentsMap.compute(
+                segmentId,
+                (id, segmentMetadata) -> {
+                  if (segmentMetadata == null) {
+                    // By design, this case shouldn't arise since both segment 
and schema is announced in the same flow
+                    // and messages shouldn't be lost in the poll
+                    // also segment announcement should always precede schema 
announcement
+                    // and there shouldn't be any schema updated for removed 
segments
+                    log.makeAlert("Schema update [%s] for unknown segment 
[%s]", segmentSchema, segmentId).emit();
+                  } else {
+                    // We know this segment.
+                    Optional<RowSignature> rowSignature =
+                        mergeOrCreateRowSignature(
+                            segmentId,
+                            segmentMetadata.getRowSignature(),
+                            segmentSchema
+                        );
+                    if (rowSignature.isPresent()) {
+                      log.debug(
+                          "Segment [%s] signature [%s] after applying schema 
update.",
+                          segmentId,
+                          rowSignature.get()
+                      );
+                      // mark the datasource for rebuilding
+                      markDataSourceAsNeedRebuild(dataSource);
+
+                      segmentMetadata = AvailableSegmentMetadata
+                          .from(segmentMetadata)
+                          .withRowSignature(rowSignature.get())
+                          .withNumRows(segmentSchema.getNumRows())
+                          .build();
+                    }
+                  }
+                  return segmentMetadata;
+                }
+            );
+            return segmentsMap;
+          }
+      );
+    }
+  }
+
+  /**
+   * Merge or create a new RowSignature using the existing RowSignature and 
schema update.
+   */
+  @VisibleForTesting
+  Optional<RowSignature> mergeOrCreateRowSignature(
+      SegmentId segmentId,
+      @Nullable RowSignature existingSignature,
+      SegmentSchemas.SegmentSchema segmentSchema
+  )
+  {
+    if (!segmentSchema.isDelta()) {
+      // absolute schema
+      // override the existing signature
+      // this case could arise when the server restarts or counter mismatch 
between client and server
+      RowSignature.Builder builder = RowSignature.builder();
+      Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap();
+      for (String column : segmentSchema.getNewColumns()) {
+        builder.add(column, columnMapping.get(column));
+      }
+      return Optional.of(ROW_SIGNATURE_INTERNER.intern(builder.build()));
+    } else if (existingSignature != null) {
+      // delta update
+      // merge with the existing signature
+      RowSignature.Builder builder = RowSignature.builder();
+      final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
+
+      for (String column : existingSignature.getColumnNames()) {
+        final ColumnType columnType =
+            existingSignature.getColumnType(column)
+                    .orElseThrow(() -> new ISE("Encountered null type for 
column [%s]", column));
+
+        columnTypes.compute(column, (c, existingType) -> 
columnTypeMergePolicy.merge(existingType, columnType));
+      }
+
+      Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap();
+
+      // column type to be updated is not present in the existing schema
+      boolean missingUpdateColumns = false;
+      // new column to be added is already present in the existing schema
+      boolean existingNewColumns = false;
+
+      for (String column : segmentSchema.getUpdatedColumns()) {

Review Comment:
   I think differentiating between missing , update adds very less value. Lets 
union both the column lists and add them to the alert ?



##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> 
segmentsToRefresh, final Set<String> da
       }
     }
   }
+
+  private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds)
+  {
+    if (realtimeSegmentSchemaAnnouncement) {
+      synchronized (lock) {
+        for (SegmentId segmentId : mutableSegments) {
+          segmentIds.remove(segmentId);
+        }
+      }
+    }
+    return segmentIds;
+  }
+
+  /**
+   * Update schema for segments.
+   */
+  @VisibleForTesting
+  void updateSchemaForSegments(SegmentSchemas segmentSchemas)
+  {
+    List<SegmentSchemas.SegmentSchema> segmentSchemaList = 
segmentSchemas.getSegmentSchemaList();
+
+    for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) {
+      String dataSource = segmentSchema.getDataSource();
+      SegmentId segmentId = SegmentId.tryParse(dataSource, 
segmentSchema.getSegmentId());
+
+      log.debug("Applying schema update for segmentId [%s] datasource [%s]", 
segmentId, dataSource);
+
+      segmentMetadataInfo.compute(
+          dataSource,
+          (dataSourceKey, segmentsMap) -> {
+            if (segmentsMap == null) {
+              segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER);
+            }
+            segmentsMap.compute(
+                segmentId,
+                (id, segmentMetadata) -> {
+                  if (segmentMetadata == null) {
+                    // By design, this case shouldn't arise since both segment 
and schema is announced in the same flow
+                    // and messages shouldn't be lost in the poll
+                    // also segment announcement should always precede schema 
announcement
+                    // and there shouldn't be any schema updated for removed 
segments
+                    log.makeAlert("Schema update [%s] for unknown segment 
[%s]", segmentSchema, segmentId).emit();
+                  } else {
+                    // We know this segment.
+                    Optional<RowSignature> rowSignature =
+                        mergeOrCreateRowSignature(
+                            segmentId,
+                            segmentMetadata.getRowSignature(),
+                            segmentSchema
+                        );
+                    if (rowSignature.isPresent()) {
+                      log.debug(
+                          "Segment [%s] signature [%s] after applying schema 
update.",
+                          segmentId,
+                          rowSignature.get()
+                      );
+                      // mark the datasource for rebuilding
+                      markDataSourceAsNeedRebuild(dataSource);
+
+                      segmentMetadata = AvailableSegmentMetadata
+                          .from(segmentMetadata)
+                          .withRowSignature(rowSignature.get())
+                          .withNumRows(segmentSchema.getNumRows())
+                          .build();
+                    }
+                  }
+                  return segmentMetadata;
+                }
+            );
+            return segmentsMap;
+          }
+      );
+    }
+  }
+
+  /**
+   * Merge or create a new RowSignature using the existing RowSignature and 
schema update.
+   */
+  @VisibleForTesting
+  Optional<RowSignature> mergeOrCreateRowSignature(
+      SegmentId segmentId,
+      @Nullable RowSignature existingSignature,
+      SegmentSchemas.SegmentSchema segmentSchema
+  )
+  {
+    if (!segmentSchema.isDelta()) {
+      // absolute schema
+      // override the existing signature
+      // this case could arise when the server restarts or counter mismatch 
between client and server
+      RowSignature.Builder builder = RowSignature.builder();
+      Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap();
+      for (String column : segmentSchema.getNewColumns()) {
+        builder.add(column, columnMapping.get(column));
+      }
+      return Optional.of(ROW_SIGNATURE_INTERNER.intern(builder.build()));
+    } else if (existingSignature != null) {
+      // delta update
+      // merge with the existing signature
+      RowSignature.Builder builder = RowSignature.builder();
+      final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
+
+      for (String column : existingSignature.getColumnNames()) {
+        final ColumnType columnType =

Review Comment:
   It seems like existingType is getting merged with existing signature ? no?



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkSchemaUtil.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import 
org.apache.druid.segment.realtime.appenderator.SegmentSchemas.SegmentSchema;
+import org.apache.druid.timeline.SegmentId;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Utility to compute schema for all sinks in a streaming ingestion task, used 
in {@link StreamAppenderator}.
+ */
+class SinkSchemaUtil
+{
+  /**
+   * Compute {@link SegmentSchemas} for the sinks.
+   */
+  @VisibleForTesting
+  static Optional<SegmentSchemas> computeAbsoluteSchema(
+      Map<SegmentId, Pair<RowSignature, Integer>> sinkSchemaMap
+  )
+  {
+    List<SegmentSchema> sinkSchemas = new ArrayList<>();
+
+    for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : 
sinkSchemaMap.entrySet()) {
+      SegmentId segmentId = entry.getKey();
+      RowSignature sinkSignature = entry.getValue().lhs;
+
+      List<String> newColumns = new ArrayList<>();
+
+      Map<String, ColumnType> columnMapping = new HashMap<>();
+
+      // new Sink
+      for (String column : sinkSignature.getColumnNames()) {
+        newColumns.add(column);
+        sinkSignature.getColumnType(column).ifPresent(type -> 
columnMapping.put(column, type));
+      }
+
+      Integer numRows = entry.getValue().rhs;
+      if (newColumns.size() > 0) {
+        SegmentSchema segmentSchema =
+            new SegmentSchema(
+                segmentId.getDataSource(),
+                segmentId.toString(),
+                false,
+                numRows,
+                newColumns,
+                Collections.emptyList(),
+                columnMapping
+            );
+        sinkSchemas.add(segmentSchema);
+      }
+    }
+
+    return Optional.ofNullable(sinkSchemas.isEmpty() ? null : new 
SegmentSchemas(sinkSchemas));
+  }
+
+  /**
+   * Compute schema change for the sinks.
+   */
+  @VisibleForTesting
+  static Optional<SegmentSchemas> computeSchemaChange(
+      Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap,
+      Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap
+  )
+  {
+    List<SegmentSchema> sinkSchemas = new ArrayList<>();
+
+    for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : 
currentSinkSignatureMap.entrySet()) {
+      SegmentId segmentId = entry.getKey();
+      RowSignature sinkSignature = entry.getValue().lhs;
+
+      Integer numRows = entry.getValue().rhs;
+
+      List<String> newColumns = new ArrayList<>();
+      List<String> updatedColumns = new ArrayList<>();
+      Map<String, ColumnType> columnMapping = new HashMap<>();
+
+      // whether there are any changes to be published
+      boolean publish = false;
+      // if the resultant schema is delta
+      boolean delta = false;
+
+      if (!previousSinkSignatureMap.containsKey(segmentId)) {
+        // new Sink
+        for (String column : sinkSignature.getColumnNames()) {
+          newColumns.add(column);
+          sinkSignature.getColumnType(column).ifPresent(type -> 
columnMapping.put(column, type));
+        }
+        if (newColumns.size() > 0 || numRows > 0) {
+          publish = true;
+        }
+      } else {
+        RowSignature previousSinkSignature = 
previousSinkSignatureMap.get(segmentId).lhs;
+        Set<String> previousSinkDimensions = new 
HashSet<>(previousSinkSignature.getColumnNames());
+
+        Integer previousNumRows = previousSinkSignatureMap.get(segmentId).rhs;
+        for (String column : sinkSignature.getColumnNames()) {
+          boolean added = false;
+          if (!previousSinkDimensions.contains(column)) {
+            newColumns.add(column);
+            added = true;
+          } else if 
(!Objects.equals(previousSinkSignature.getColumnType(column), 
sinkSignature.getColumnType(column))) {
+            updatedColumns.add(column);
+            added = true;
+          }
+
+          if (added) {
+            sinkSignature.getColumnType(column).ifPresent(type -> 
columnMapping.put(column, type));

Review Comment:
   ```suggestion
               sinkSignature.getColumnType(column).ifPresent(type -> 
currentColumnMapping.put(column, type));
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -1581,4 +1604,74 @@ private int calculateSinkMemoryInUsed(Sink sink)
     // Rough estimate of memory footprint of empty Sink based on actual heap 
dumps
     return ROUGH_OVERHEAD_PER_SINK;
   }
+
+  /**
+   * This inner class periodically computes absolute and delta schema for all 
the {@link StreamAppenderator#sinks}
+   * and announces them.
+   */
+  @VisibleForTesting
+  class SinkSchemaAnnouncer
+  {
+    private static final long SCHEMA_PUBLISH_DELAY_MILLIS = 0;
+    private static final long SCHEMA_PUBLISH_PERIOD_MILLIS = 60_000;
+
+    private final DataSegmentAnnouncer announcer;
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final String taskId;
+    private Map<SegmentId, Pair<RowSignature, Integer>> 
previousSinkSignatureMap = new HashMap<>();

Review Comment:
   Please mention that this datastructure is always accessed by 
`Sink-Schema-Announcer-1` thread hence its okay to be non thread safe. 



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkSchemaUtil.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import 
org.apache.druid.segment.realtime.appenderator.SegmentSchemas.SegmentSchema;
+import org.apache.druid.timeline.SegmentId;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Utility to compute schema for all sinks in a streaming ingestion task, used 
in {@link StreamAppenderator}.
+ */
+class SinkSchemaUtil
+{
+  /**
+   * Compute {@link SegmentSchemas} for the sinks.
+   */
+  @VisibleForTesting
+  static Optional<SegmentSchemas> computeAbsoluteSchema(
+      Map<SegmentId, Pair<RowSignature, Integer>> sinkSchemaMap
+  )
+  {
+    List<SegmentSchema> sinkSchemas = new ArrayList<>();
+
+    for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : 
sinkSchemaMap.entrySet()) {
+      SegmentId segmentId = entry.getKey();
+      RowSignature sinkSignature = entry.getValue().lhs;
+
+      List<String> newColumns = new ArrayList<>();
+
+      Map<String, ColumnType> columnMapping = new HashMap<>();
+
+      // new Sink
+      for (String column : sinkSignature.getColumnNames()) {
+        newColumns.add(column);
+        sinkSignature.getColumnType(column).ifPresent(type -> 
columnMapping.put(column, type));
+      }
+
+      Integer numRows = entry.getValue().rhs;
+      if (newColumns.size() > 0) {
+        SegmentSchema segmentSchema =
+            new SegmentSchema(
+                segmentId.getDataSource(),
+                segmentId.toString(),
+                false,
+                numRows,
+                newColumns,
+                Collections.emptyList(),
+                columnMapping
+            );
+        sinkSchemas.add(segmentSchema);
+      }
+    }
+
+    return Optional.ofNullable(sinkSchemas.isEmpty() ? null : new 
SegmentSchemas(sinkSchemas));
+  }
+
+  /**
+   * Compute schema change for the sinks.
+   */
+  @VisibleForTesting
+  static Optional<SegmentSchemas> computeSchemaChange(
+      Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap,
+      Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap
+  )
+  {
+    List<SegmentSchema> sinkSchemas = new ArrayList<>();
+
+    for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : 
currentSinkSignatureMap.entrySet()) {
+      SegmentId segmentId = entry.getKey();
+      RowSignature sinkSignature = entry.getValue().lhs;

Review Comment:
   ```suggestion
         RowSignature currentSignature = entry.getValue().lhs;
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkSchemaUtil.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import 
org.apache.druid.segment.realtime.appenderator.SegmentSchemas.SegmentSchema;
+import org.apache.druid.timeline.SegmentId;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Utility to compute schema for all sinks in a streaming ingestion task, used 
in {@link StreamAppenderator}.
+ */
+class SinkSchemaUtil
+{
+  /**
+   * Compute {@link SegmentSchemas} for the sinks.
+   */
+  @VisibleForTesting
+  static Optional<SegmentSchemas> computeAbsoluteSchema(
+      Map<SegmentId, Pair<RowSignature, Integer>> sinkSchemaMap
+  )
+  {
+    List<SegmentSchema> sinkSchemas = new ArrayList<>();
+
+    for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : 
sinkSchemaMap.entrySet()) {
+      SegmentId segmentId = entry.getKey();
+      RowSignature sinkSignature = entry.getValue().lhs;
+
+      List<String> newColumns = new ArrayList<>();
+
+      Map<String, ColumnType> columnMapping = new HashMap<>();
+
+      // new Sink
+      for (String column : sinkSignature.getColumnNames()) {
+        newColumns.add(column);
+        sinkSignature.getColumnType(column).ifPresent(type -> 
columnMapping.put(column, type));
+      }
+
+      Integer numRows = entry.getValue().rhs;
+      if (newColumns.size() > 0) {
+        SegmentSchema segmentSchema =
+            new SegmentSchema(
+                segmentId.getDataSource(),
+                segmentId.toString(),
+                false,
+                numRows,
+                newColumns,
+                Collections.emptyList(),
+                columnMapping
+            );
+        sinkSchemas.add(segmentSchema);
+      }
+    }
+
+    return Optional.ofNullable(sinkSchemas.isEmpty() ? null : new 
SegmentSchemas(sinkSchemas));
+  }
+
+  /**
+   * Compute schema change for the sinks.
+   */
+  @VisibleForTesting
+  static Optional<SegmentSchemas> computeSchemaChange(
+      Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap,
+      Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap
+  )
+  {
+    List<SegmentSchema> sinkSchemas = new ArrayList<>();
+
+    for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : 
currentSinkSignatureMap.entrySet()) {
+      SegmentId segmentId = entry.getKey();
+      RowSignature sinkSignature = entry.getValue().lhs;
+
+      Integer numRows = entry.getValue().rhs;
+
+      List<String> newColumns = new ArrayList<>();
+      List<String> updatedColumns = new ArrayList<>();
+      Map<String, ColumnType> columnMapping = new HashMap<>();
+
+      // whether there are any changes to be published
+      boolean publish = false;
+      // if the resultant schema is delta
+      boolean delta = false;
+
+      if (!previousSinkSignatureMap.containsKey(segmentId)) {
+        // new Sink
+        for (String column : sinkSignature.getColumnNames()) {
+          newColumns.add(column);
+          sinkSignature.getColumnType(column).ifPresent(type -> 
columnMapping.put(column, type));
+        }
+        if (newColumns.size() > 0 || numRows > 0) {

Review Comment:
   ```suggestion
           if (newColumns.size() > 0) {
   ```
   I think we should only publish when we have newCols.



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkSchemaUtil.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import 
org.apache.druid.segment.realtime.appenderator.SegmentSchemas.SegmentSchema;
+import org.apache.druid.timeline.SegmentId;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Utility to compute schema for all sinks in a streaming ingestion task, used 
in {@link StreamAppenderator}.
+ */
+class SinkSchemaUtil
+{
+  /**
+   * Compute {@link SegmentSchemas} for the sinks.
+   */
+  @VisibleForTesting
+  static Optional<SegmentSchemas> computeAbsoluteSchema(
+      Map<SegmentId, Pair<RowSignature, Integer>> sinkSchemaMap
+  )
+  {
+    List<SegmentSchema> sinkSchemas = new ArrayList<>();
+
+    for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : 
sinkSchemaMap.entrySet()) {
+      SegmentId segmentId = entry.getKey();
+      RowSignature sinkSignature = entry.getValue().lhs;
+
+      List<String> newColumns = new ArrayList<>();
+
+      Map<String, ColumnType> columnMapping = new HashMap<>();
+
+      // new Sink
+      for (String column : sinkSignature.getColumnNames()) {
+        newColumns.add(column);
+        sinkSignature.getColumnType(column).ifPresent(type -> 
columnMapping.put(column, type));
+      }
+
+      Integer numRows = entry.getValue().rhs;
+      if (newColumns.size() > 0) {
+        SegmentSchema segmentSchema =
+            new SegmentSchema(
+                segmentId.getDataSource(),
+                segmentId.toString(),
+                false,
+                numRows,
+                newColumns,
+                Collections.emptyList(),
+                columnMapping
+            );
+        sinkSchemas.add(segmentSchema);
+      }
+    }
+
+    return Optional.ofNullable(sinkSchemas.isEmpty() ? null : new 
SegmentSchemas(sinkSchemas));
+  }
+
+  /**
+   * Compute schema change for the sinks.
+   */
+  @VisibleForTesting
+  static Optional<SegmentSchemas> computeSchemaChange(
+      Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap,
+      Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap
+  )
+  {
+    List<SegmentSchema> sinkSchemas = new ArrayList<>();
+
+    for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : 
currentSinkSignatureMap.entrySet()) {
+      SegmentId segmentId = entry.getKey();
+      RowSignature sinkSignature = entry.getValue().lhs;
+
+      Integer numRows = entry.getValue().rhs;
+
+      List<String> newColumns = new ArrayList<>();
+      List<String> updatedColumns = new ArrayList<>();
+      Map<String, ColumnType> columnMapping = new HashMap<>();
+
+      // whether there are any changes to be published
+      boolean publish = false;

Review Comment:
   ```suggestion
         boolean toPublish = false;
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkSchemaUtil.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import 
org.apache.druid.segment.realtime.appenderator.SegmentSchemas.SegmentSchema;
+import org.apache.druid.timeline.SegmentId;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Utility to compute schema for all sinks in a streaming ingestion task, used 
in {@link StreamAppenderator}.
+ */
+class SinkSchemaUtil
+{
+  /**
+   * Compute {@link SegmentSchemas} for the sinks.
+   */
+  @VisibleForTesting
+  static Optional<SegmentSchemas> computeAbsoluteSchema(
+      Map<SegmentId, Pair<RowSignature, Integer>> sinkSchemaMap
+  )
+  {
+    List<SegmentSchema> sinkSchemas = new ArrayList<>();
+
+    for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : 
sinkSchemaMap.entrySet()) {
+      SegmentId segmentId = entry.getKey();
+      RowSignature sinkSignature = entry.getValue().lhs;
+
+      List<String> newColumns = new ArrayList<>();
+
+      Map<String, ColumnType> columnMapping = new HashMap<>();
+
+      // new Sink
+      for (String column : sinkSignature.getColumnNames()) {
+        newColumns.add(column);
+        sinkSignature.getColumnType(column).ifPresent(type -> 
columnMapping.put(column, type));
+      }
+
+      Integer numRows = entry.getValue().rhs;
+      if (newColumns.size() > 0) {
+        SegmentSchema segmentSchema =
+            new SegmentSchema(
+                segmentId.getDataSource(),
+                segmentId.toString(),
+                false,
+                numRows,
+                newColumns,
+                Collections.emptyList(),
+                columnMapping
+            );
+        sinkSchemas.add(segmentSchema);
+      }
+    }
+
+    return Optional.ofNullable(sinkSchemas.isEmpty() ? null : new 
SegmentSchemas(sinkSchemas));
+  }
+
+  /**
+   * Compute schema change for the sinks.
+   */
+  @VisibleForTesting
+  static Optional<SegmentSchemas> computeSchemaChange(
+      Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap,
+      Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap
+  )
+  {
+    List<SegmentSchema> sinkSchemas = new ArrayList<>();
+
+    for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : 
currentSinkSignatureMap.entrySet()) {
+      SegmentId segmentId = entry.getKey();
+      RowSignature sinkSignature = entry.getValue().lhs;
+
+      Integer numRows = entry.getValue().rhs;
+
+      List<String> newColumns = new ArrayList<>();
+      List<String> updatedColumns = new ArrayList<>();
+      Map<String, ColumnType> columnMapping = new HashMap<>();
+
+      // whether there are any changes to be published
+      boolean publish = false;
+      // if the resultant schema is delta
+      boolean delta = false;

Review Comment:
   ```suggestion
         boolean isDelta = false;
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java:
##########
@@ -82,7 +88,12 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
   private final long maxBytesInMemory;
   private final boolean useMaxMemoryEstimates;
   private final CopyOnWriteArrayList<FireHydrant> hydrants = new 
CopyOnWriteArrayList<>();
+
   private final LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
+  // columns excluding current index, includes __time column

Review Comment:
   what is curIndex, the inmemory Fire hydrant ? If yes lets rename it like 
that / or document it 



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkSchemaUtil.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import 
org.apache.druid.segment.realtime.appenderator.SegmentSchemas.SegmentSchema;
+import org.apache.druid.timeline.SegmentId;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Utility to compute schema for all sinks in a streaming ingestion task, used 
in {@link StreamAppenderator}.
+ */
+class SinkSchemaUtil
+{
+  /**
+   * Compute {@link SegmentSchemas} for the sinks.
+   */
+  @VisibleForTesting
+  static Optional<SegmentSchemas> computeAbsoluteSchema(
+      Map<SegmentId, Pair<RowSignature, Integer>> sinkSchemaMap
+  )
+  {
+    List<SegmentSchema> sinkSchemas = new ArrayList<>();
+
+    for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : 
sinkSchemaMap.entrySet()) {
+      SegmentId segmentId = entry.getKey();
+      RowSignature sinkSignature = entry.getValue().lhs;
+
+      List<String> newColumns = new ArrayList<>();
+
+      Map<String, ColumnType> columnMapping = new HashMap<>();
+
+      // new Sink
+      for (String column : sinkSignature.getColumnNames()) {
+        newColumns.add(column);
+        sinkSignature.getColumnType(column).ifPresent(type -> 
columnMapping.put(column, type));
+      }
+
+      Integer numRows = entry.getValue().rhs;
+      if (newColumns.size() > 0) {
+        SegmentSchema segmentSchema =
+            new SegmentSchema(
+                segmentId.getDataSource(),
+                segmentId.toString(),
+                false,
+                numRows,
+                newColumns,
+                Collections.emptyList(),
+                columnMapping
+            );
+        sinkSchemas.add(segmentSchema);
+      }
+    }
+
+    return Optional.ofNullable(sinkSchemas.isEmpty() ? null : new 
SegmentSchemas(sinkSchemas));
+  }
+
+  /**
+   * Compute schema change for the sinks.
+   */
+  @VisibleForTesting
+  static Optional<SegmentSchemas> computeSchemaChange(
+      Map<SegmentId, Pair<RowSignature, Integer>> previousSinkSignatureMap,
+      Map<SegmentId, Pair<RowSignature, Integer>> currentSinkSignatureMap
+  )
+  {
+    List<SegmentSchema> sinkSchemas = new ArrayList<>();
+
+    for (Map.Entry<SegmentId, Pair<RowSignature, Integer>> entry : 
currentSinkSignatureMap.entrySet()) {
+      SegmentId segmentId = entry.getKey();
+      RowSignature sinkSignature = entry.getValue().lhs;
+
+      Integer numRows = entry.getValue().rhs;
+
+      List<String> newColumns = new ArrayList<>();
+      List<String> updatedColumns = new ArrayList<>();
+      Map<String, ColumnType> columnMapping = new HashMap<>();
+
+      // whether there are any changes to be published
+      boolean publish = false;
+      // if the resultant schema is delta
+      boolean delta = false;
+
+      if (!previousSinkSignatureMap.containsKey(segmentId)) {
+        // new Sink
+        for (String column : sinkSignature.getColumnNames()) {
+          newColumns.add(column);
+          sinkSignature.getColumnType(column).ifPresent(type -> 
columnMapping.put(column, type));
+        }
+        if (newColumns.size() > 0 || numRows > 0) {
+          publish = true;
+        }
+      } else {
+        RowSignature previousSinkSignature = 
previousSinkSignatureMap.get(segmentId).lhs;
+        Set<String> previousSinkDimensions = new 
HashSet<>(previousSinkSignature.getColumnNames());
+
+        Integer previousNumRows = previousSinkSignatureMap.get(segmentId).rhs;
+        for (String column : sinkSignature.getColumnNames()) {
+          boolean added = false;
+          if (!previousSinkDimensions.contains(column)) {
+            newColumns.add(column);
+            added = true;
+          } else if 
(!Objects.equals(previousSinkSignature.getColumnType(column), 
sinkSignature.getColumnType(column))) {
+            updatedColumns.add(column);
+            added = true;
+          }
+
+          if (added) {
+            sinkSignature.getColumnType(column).ifPresent(type -> 
columnMapping.put(column, type));
+          }
+        }
+
+        if ((!Objects.equals(numRows, previousNumRows)) || 
(updatedColumns.size() > 0) || (newColumns.size() > 0)) {
+          publish = true;
+          delta = true;
+        }
+      }
+
+      if (publish) {
+        SegmentSchema segmentSchema =
+            new SegmentSchema(
+                segmentId.getDataSource(),
+                segmentId.toString(),
+                delta,

Review Comment:
   ```suggestion
                   true,
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java:
##########
@@ -420,6 +442,51 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, 
DataSchema schema)
     return old;
   }
 
+  /**
+   * Merge the column from the index with the existing columns.
+   */
+  private void mergeIndexDimensions(StorageAdapter storageAdapter)

Review Comment:
   ```suggestion
     private void overWriteIndexDimensions(StorageAdapter storageAdapter)
   ```



##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> 
segmentsToRefresh, final Set<String> da
       }
     }
   }
+
+  private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds)
+  {
+    if (realtimeSegmentSchemaAnnouncement) {
+      synchronized (lock) {
+        for (SegmentId segmentId : mutableSegments) {
+          segmentIds.remove(segmentId);
+        }
+      }
+    }
+    return segmentIds;
+  }
+
+  /**
+   * Update schema for segments.
+   */
+  @VisibleForTesting
+  void updateSchemaForSegments(SegmentSchemas segmentSchemas)
+  {
+    List<SegmentSchemas.SegmentSchema> segmentSchemaList = 
segmentSchemas.getSegmentSchemaList();
+
+    for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) {
+      String dataSource = segmentSchema.getDataSource();
+      SegmentId segmentId = SegmentId.tryParse(dataSource, 
segmentSchema.getSegmentId());
+
+      log.debug("Applying schema update for segmentId [%s] datasource [%s]", 
segmentId, dataSource);
+
+      segmentMetadataInfo.compute(
+          dataSource,
+          (dataSourceKey, segmentsMap) -> {
+            if (segmentsMap == null) {
+              segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER);
+            }
+            segmentsMap.compute(
+                segmentId,
+                (id, segmentMetadata) -> {
+                  if (segmentMetadata == null) {
+                    // By design, this case shouldn't arise since both segment 
and schema is announced in the same flow
+                    // and messages shouldn't be lost in the poll
+                    // also segment announcement should always precede schema 
announcement
+                    // and there shouldn't be any schema updated for removed 
segments
+                    log.makeAlert("Schema update [%s] for unknown segment 
[%s]", segmentSchema, segmentId).emit();
+                  } else {
+                    // We know this segment.
+                    Optional<RowSignature> rowSignature =
+                        mergeOrCreateRowSignature(
+                            segmentId,
+                            segmentMetadata.getRowSignature(),
+                            segmentSchema
+                        );
+                    if (rowSignature.isPresent()) {
+                      log.debug(
+                          "Segment [%s] signature [%s] after applying schema 
update.",
+                          segmentId,
+                          rowSignature.get()
+                      );
+                      // mark the datasource for rebuilding
+                      markDataSourceAsNeedRebuild(dataSource);
+
+                      segmentMetadata = AvailableSegmentMetadata
+                          .from(segmentMetadata)
+                          .withRowSignature(rowSignature.get())
+                          .withNumRows(segmentSchema.getNumRows())
+                          .build();
+                    }
+                  }
+                  return segmentMetadata;
+                }
+            );
+            return segmentsMap;
+          }
+      );
+    }
+  }
+
+  /**
+   * Merge or create a new RowSignature using the existing RowSignature and 
schema update.
+   */
+  @VisibleForTesting
+  Optional<RowSignature> mergeOrCreateRowSignature(
+      SegmentId segmentId,
+      @Nullable RowSignature existingSignature,
+      SegmentSchemas.SegmentSchema segmentSchema
+  )
+  {
+    if (!segmentSchema.isDelta()) {
+      // absolute schema
+      // override the existing signature
+      // this case could arise when the server restarts or counter mismatch 
between client and server
+      RowSignature.Builder builder = RowSignature.builder();
+      Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap();
+      for (String column : segmentSchema.getNewColumns()) {
+        builder.add(column, columnMapping.get(column));
+      }
+      return Optional.of(ROW_SIGNATURE_INTERNER.intern(builder.build()));
+    } else if (existingSignature != null) {
+      // delta update
+      // merge with the existing signature
+      RowSignature.Builder builder = RowSignature.builder();
+      final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
+
+      for (String column : existingSignature.getColumnNames()) {
+        final ColumnType columnType =

Review Comment:
   We could just iterate over the existingSignature and populate the 
`columnTypes` map no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to