This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/30.0.0 by this push:
     new fb86ada253d [Backport] Fix backwards compatibility with centralized 
schema config in partial_index_merge tasks (#16566)
fb86ada253d is described below

commit fb86ada253d079213b7e4bc337d30b166ace5939
Author: Rishabh Singh <[email protected]>
AuthorDate: Thu Jun 6 21:06:02 2024 +0530

    [Backport] Fix backwards compatibility with centralized schema config in 
partial_index_merge tasks (#16566)
    
    * Fix backwards compatibility with centralized schema config in 
partial_index_merge tasks (#16556)
    
    * Handle null values of centralized schema config in PartialMergeTask
    
    * Fix checkstyle
    
    * Do not pass centralized schema config from supervisor task to sub-tasks
    
    * Do not pass ObjectMapper in constructor of task
    
    * Fix logs
    
    * Fix tests
    
    * Revert log line to debug (#16565)
    
    ---------
    
    Co-authored-by: Kashif Faraz <[email protected]>
---
 .../batch/parallel/ParallelIndexSupervisorTask.java  |  6 ++----
 ...alGenericSegmentMergeParallelIndexTaskRunner.java | 14 ++------------
 .../parallel/PartialGenericSegmentMergeTask.java     | 20 ++------------------
 .../task/batch/parallel/PartialSegmentMergeTask.java | 14 ++++----------
 .../parallel/PartialGenericSegmentMergeTaskTest.java |  9 ++-------
 .../metadata/AbstractSegmentMetadataCache.java       |  2 +-
 .../druid/segment/metadata/FingerprintGenerator.java | 14 ++++++++------
 7 files changed, 21 insertions(+), 58 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index e2c0681d001..4ca0f1ff80d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -441,9 +441,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask
         ingestionSchema.getDataSchema(),
         ioConfigs,
         ingestionSchema.getTuningConfig(),
-        getContext(),
-        toolbox.getJsonMapper(),
-        toolbox.getCentralizedTableSchemaConfig()
+        getContext()
     );
   }
 
@@ -1620,7 +1618,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask
     for (PushedSegmentsReport pushedSegmentsReport : 
completedSubtaskReports.values()) {
       TaskReport.ReportMap taskReport = pushedSegmentsReport.getTaskReport();
       if (taskReport == null || taskReport.isEmpty()) {
-        LOG.warn("Received an empty report from subtask[%s]" + 
pushedSegmentsReport.getTaskId());
+        LOG.warn("Received an empty report from sub-task[%s].", 
pushedSegmentsReport.getTaskId());
         continue;
       }
       RowIngestionMetersTotals rowIngestionMetersTotals = 
getBuildSegmentsStatsFromTaskReport(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java
index 0c743d1f186..8babf50d826 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java
@@ -19,12 +19,10 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 
 import java.util.Iterator;
 import java.util.List;
@@ -40,8 +38,6 @@ class PartialGenericSegmentMergeParallelIndexTaskRunner
 
   private final DataSchema dataSchema;
   private final List<PartialSegmentMergeIOConfig> mergeIOConfigs;
-  private final CentralizedDatasourceSchemaConfig 
centralizedDatasourceSchemaConfig;
-  private final ObjectMapper mapper;
 
   PartialGenericSegmentMergeParallelIndexTaskRunner(
       TaskToolbox toolbox,
@@ -51,17 +47,13 @@ class PartialGenericSegmentMergeParallelIndexTaskRunner
       DataSchema dataSchema,
       List<PartialSegmentMergeIOConfig> mergeIOConfigs,
       ParallelIndexTuningConfig tuningConfig,
-      Map<String, Object> context,
-      ObjectMapper mapper,
-      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+      Map<String, Object> context
   )
   {
     super(toolbox, taskId, groupId, baseSubtaskSpecName, tuningConfig, 
context);
 
     this.dataSchema = dataSchema;
     this.mergeIOConfigs = mergeIOConfigs;
-    this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
-    this.mapper = mapper;
   }
 
   @Override
@@ -110,9 +102,7 @@ class PartialGenericSegmentMergeParallelIndexTaskRunner
             subtaskSpecId,
             numAttempts,
             ingestionSpec,
-            getContext(),
-            centralizedDatasourceSchemaConfig,
-            mapper
+            getContext()
         );
       }
     };
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java
index be44fb282ef..989f0a77daa 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java
@@ -19,11 +19,9 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
-import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableSet;
@@ -31,7 +29,6 @@ import com.google.common.collect.Table;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.timeline.partition.BuildingShardSpec;
 import org.apache.druid.timeline.partition.ShardSpec;
@@ -53,8 +50,6 @@ public class PartialGenericSegmentMergeTask extends 
PartialSegmentMergeTask<Buil
   private final PartialSegmentMergeIngestionSpec ingestionSchema;
   private final Table<Interval, Integer, BuildingShardSpec<?>> 
intervalAndIntegerToShardSpec;
 
-  private final CentralizedDatasourceSchemaConfig 
centralizedDatasourceSchemaConfig;
-
   @JsonCreator
   public PartialGenericSegmentMergeTask(
       // id shouldn't be null except when this task is created by 
ParallelIndexSupervisorTask
@@ -66,9 +61,7 @@ public class PartialGenericSegmentMergeTask extends 
PartialSegmentMergeTask<Buil
       @JsonProperty("subtaskSpecId") @Nullable final String subtaskSpecId,
       @JsonProperty("numAttempts") final int numAttempts, // zero-based 
counting
       @JsonProperty("spec") final PartialSegmentMergeIngestionSpec 
ingestionSchema,
-      @JsonProperty("context") final Map<String, Object> context,
-      @JsonProperty("centralizedDatasourceSchemaConfig") 
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
-      @JacksonInject ObjectMapper mapper
+      @JsonProperty("context") final Map<String, Object> context
   )
   {
     super(
@@ -81,12 +74,9 @@ public class PartialGenericSegmentMergeTask extends 
PartialSegmentMergeTask<Buil
         ingestionSchema.getIOConfig(),
         ingestionSchema.getTuningConfig(),
         numAttempts,
-        context,
-        mapper,
-        centralizedDatasourceSchemaConfig
+        context
     );
 
-    this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
     this.ingestionSchema = ingestionSchema;
     this.intervalAndIntegerToShardSpec = createIntervalAndIntegerToShardSpec(
         ingestionSchema.getIOConfig().getPartitionLocations()
@@ -127,12 +117,6 @@ public class PartialGenericSegmentMergeTask extends 
PartialSegmentMergeTask<Buil
     return ingestionSchema;
   }
 
-  @JsonProperty("centralizedDatasourceSchemaConfig")
-  private CentralizedDatasourceSchemaConfig 
getCentralizedDatasourceSchemaConfig()
-  {
-    return centralizedDatasourceSchemaConfig;
-  }
-
   @Override
   public String getType()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
index 7f0208417ff..17d9936c685 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
@@ -20,7 +20,6 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Maps;
@@ -84,8 +83,6 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> 
extends PerfectRollu
   private final PartialSegmentMergeIOConfig ioConfig;
   private final int numAttempts;
   private final String subtaskSpecId;
-  private final CentralizedDatasourceSchemaConfig 
centralizedDatasourceSchemaConfig;
-  private final FingerprintGenerator fingerprintGenerator;
 
   PartialSegmentMergeTask(
       // id shouldn't be null except when this task is created by 
ParallelIndexSupervisorTask
@@ -98,9 +95,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> 
extends PerfectRollu
       PartialSegmentMergeIOConfig ioConfig,
       ParallelIndexTuningConfig tuningConfig,
       final int numAttempts, // zero-based counting
-      final Map<String, Object> context,
-      final ObjectMapper mapper,
-      final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+      final Map<String, Object> context
   )
   {
     super(
@@ -120,8 +115,6 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> 
extends PerfectRollu
     this.subtaskSpecId = subtaskSpecId;
     this.ioConfig = ioConfig;
     this.numAttempts = numAttempts;
-    this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
-    this.fingerprintGenerator = new FingerprintGenerator(mapper);
   }
 
   @JsonProperty
@@ -265,6 +258,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> 
extends PerfectRollu
     final Set<DataSegment> pushedSegments = new HashSet<>();
     final SegmentSchemaMapping segmentSchemaMapping = new 
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
 
+    final FingerprintGenerator fingerprintGenerator = new 
FingerprintGenerator(toolbox.getJsonMapper());
     for (Entry<Interval, Int2ObjectMap<List<File>>> entryPerInterval : 
intervalToUnzippedFiles.entrySet()) {
       final Interval interval = entryPerInterval.getKey();
       for (Int2ObjectMap.Entry<List<File>> entryPerBucketId : 
entryPerInterval.getValue().int2ObjectEntrySet()) {
@@ -315,7 +309,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> 
extends PerfectRollu
         long pushFinishTime = System.nanoTime();
         pushedSegments.add(segment);
 
-        if (centralizedDatasourceSchemaConfig.isEnabled()) {
+        if (toolbox.getCentralizedTableSchemaConfig().isEnabled()) {
           SchemaPayloadPlus schemaPayloadPlus =
               
TaskSegmentSchemaUtil.getSegmentSchema(mergedFileAndDimensionNames.lhs, 
toolbox.getIndexIO());
           segmentSchemaMapping.addSchema(
@@ -341,7 +335,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec> 
extends PerfectRollu
         );
       }
     }
-    if (centralizedDatasourceSchemaConfig.isEnabled()) {
+    if (toolbox.getCentralizedTableSchemaConfig().isEnabled()) {
       LOG.info("SegmentSchema for the pushed segments is [%s]", 
segmentSchemaMapping);
     }
     return new DataSegmentsWithSchemas(pushedSegments, 
segmentSchemaMapping.isNonEmpty() ? segmentSchemaMapping : null);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
index d35bba9b4ac..a51856f7353 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Before;
@@ -104,9 +103,7 @@ public class PartialGenericSegmentMergeTaskTest extends 
AbstractParallelIndexSup
         ParallelIndexTestingFactory.SUBTASK_SPEC_ID,
         ParallelIndexTestingFactory.NUM_ATTEMPTS,
         ingestionSpec,
-        ParallelIndexTestingFactory.CONTEXT,
-        CentralizedDatasourceSchemaConfig.create(),
-        null
+        ParallelIndexTestingFactory.CONTEXT
     );
   }
 
@@ -143,9 +140,7 @@ public class PartialGenericSegmentMergeTaskTest extends 
AbstractParallelIndexSup
                 .partitionsSpec(partitionsSpec)
                 .build()
         ),
-        ParallelIndexTestingFactory.CONTEXT,
-        CentralizedDatasourceSchemaConfig.create(),
-        null
+        ParallelIndexTestingFactory.CONTEXT
     );
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
index 5c38bb7aaaa..9cb2297db82 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
@@ -733,7 +733,7 @@ public abstract class AbstractSegmentMetadataCache<T 
extends DataSourceInformati
           log.warn("Got analysis for segment [%s] we didn't ask for, 
ignoring.", analysis.getId());
         } else {
           final RowSignature rowSignature = analysisToRowSignature(analysis);
-          log.info("Segment[%s] has signature[%s].", segmentId, rowSignature);
+          log.debug("Segment[%s] has signature[%s].", segmentId, rowSignature);
 
           if (segmentMetadataQueryResultHandler(dataSource, segmentId, 
rowSignature, analysis)) {
             retVal.add(segmentId);
diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java
index ffaccd09bdf..f1cb8ea0a50 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java
@@ -25,6 +25,7 @@ import com.google.common.hash.Hashing;
 import com.google.common.io.BaseEncoding;
 import com.google.common.primitives.Ints;
 import com.google.inject.Inject;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -71,13 +72,14 @@ public class FingerprintGenerator
     }
     catch (IOException e) {
       log.error(
-          "Exception generating fingerprint for payload [%s], datasource [%s], 
version [%s] with stacktrace [%s].",
-          schemaPayload,
-          dataSource,
-          version,
-          e
+          e,
+          "Exception generating schema fingerprint (version[%d]) for 
datasource[%s], payload[%s].",
+          version, dataSource, schemaPayload
+      );
+      throw DruidException.defensive(
+          "Could not generate schema fingerprint (version[%d]) for 
datasource[%s].",
+          dataSource, version
       );
-      throw new RuntimeException(e);
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to