findingrish commented on code in PR #15475:
URL: https://github.com/apache/druid/pull/15475#discussion_r1440219158


##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -148,4 +174,165 @@ public void refresh(final Set<SegmentId> 
segmentsToRefresh, final Set<String> da
       }
     }
   }
+
+  private Set<SegmentId> filterMutableSegments(Set<SegmentId> segmentIds)
+  {
+    if (realtimeSegmentSchemaAnnouncement) {
+      synchronized (lock) {
+        for (SegmentId segmentId : mutableSegments) {
+          segmentIds.remove(segmentId);
+        }
+      }
+    }
+    return segmentIds;
+  }
+
+  /**
+   * Update schema for segments.
+   */
+  @VisibleForTesting
+  void updateSchemaForSegments(SegmentSchemas segmentSchemas)
+  {
+    List<SegmentSchemas.SegmentSchema> segmentSchemaList = 
segmentSchemas.getSegmentSchemaList();
+
+    for (SegmentSchemas.SegmentSchema segmentSchema : segmentSchemaList) {
+      String dataSource = segmentSchema.getDataSource();
+      SegmentId segmentId = SegmentId.tryParse(dataSource, 
segmentSchema.getSegmentId());
+
+      log.debug("Applying schema update for segmentId [%s] datasource [%s]", 
segmentId, dataSource);
+
+      segmentMetadataInfo.compute(
+          dataSource,
+          (dataSourceKey, segmentsMap) -> {
+            if (segmentsMap == null) {
+              segmentsMap = new ConcurrentSkipListMap<>(SEGMENT_ORDER);
+            }
+            segmentsMap.compute(
+                segmentId,
+                (id, segmentMetadata) -> {
+                  if (segmentMetadata == null) {
+                    // By design, this case shouldn't arise since both segment 
and schema is announced in the same flow
+                    // and messages shouldn't be lost in the poll
+                    // also segment announcement should always precede schema 
announcement
+                    // and there shouldn't be any schema updated for removed 
segments
+                    log.makeAlert("Schema update [%s] for unknown segment 
[%s]", segmentSchema, segmentId).emit();
+                  } else {
+                    // We know this segment.
+                    Optional<RowSignature> rowSignature =
+                        mergeOrCreateRowSignature(
+                            segmentId,
+                            segmentMetadata.getRowSignature(),
+                            segmentSchema
+                        );
+                    if (rowSignature.isPresent()) {
+                      log.debug(
+                          "Segment [%s] signature [%s] after applying schema 
update.",
+                          segmentId,
+                          rowSignature.get()
+                      );
+                      // mark the datasource for rebuilding
+                      markDataSourceAsNeedRebuild(dataSource);
+
+                      segmentMetadata = AvailableSegmentMetadata
+                          .from(segmentMetadata)
+                          .withRowSignature(rowSignature.get())
+                          .withNumRows(segmentSchema.getNumRows())
+                          .build();
+                    }
+                  }
+                  return segmentMetadata;
+                }
+            );
+            return segmentsMap;
+          }
+      );
+    }
+  }
+
+  /**
+   * Merge or create a new RowSignature using the existing RowSignature and 
schema update.
+   */
+  @VisibleForTesting
+  Optional<RowSignature> mergeOrCreateRowSignature(
+      SegmentId segmentId,
+      @Nullable RowSignature existingSignature,
+      SegmentSchemas.SegmentSchema segmentSchema
+  )
+  {
+    if (!segmentSchema.isDelta()) {
+      // absolute schema
+      // override the existing signature
+      // this case could arise when the server restarts or counter mismatch 
between client and server
+      RowSignature.Builder builder = RowSignature.builder();
+      Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap();
+      for (String column : segmentSchema.getNewColumns()) {
+        builder.add(column, columnMapping.get(column));
+      }
+      return Optional.of(ROW_SIGNATURE_INTERNER.intern(builder.build()));
+    } else if (existingSignature != null) {
+      // delta update
+      // merge with the existing signature
+      RowSignature.Builder builder = RowSignature.builder();
+      final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
+
+      for (String column : existingSignature.getColumnNames()) {
+        final ColumnType columnType =
+            existingSignature.getColumnType(column)
+                    .orElseThrow(() -> new ISE("Encountered null type for 
column [%s]", column));
+
+        columnTypes.compute(column, (c, existingType) -> 
columnTypeMergePolicy.merge(existingType, columnType));
+      }
+
+      Map<String, ColumnType> columnMapping = segmentSchema.getColumnTypeMap();
+
+      // column type to be updated is not present in the existing schema
+      boolean missingUpdateColumns = false;
+      // new column to be added is already present in the existing schema
+      boolean existingNewColumns = false;
+
+      for (String column : segmentSchema.getUpdatedColumns()) {

Review Comment:
   I suggest we go with separate list and unify if there is a need. 
   The differentiation might help in the event of an issue. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to