This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/30.0.0 by this push:
new fb86ada253d [Backport] Fix backwards compatibility with centralized
schema config in partial_index_merge tasks (#16566)
fb86ada253d is described below
commit fb86ada253d079213b7e4bc337d30b166ace5939
Author: Rishabh Singh <[email protected]>
AuthorDate: Thu Jun 6 21:06:02 2024 +0530
[Backport] Fix backwards compatibility with centralized schema config in
partial_index_merge tasks (#16566)
* Fix backwards compatibility with centralized schema config in
partial_index_merge tasks (#16556)
* Handle null values of centralized schema config in PartialMergeTask
* Fix checkstyle
* Do not pass centralized schema config from supervisor task to sub-tasks
* Do not pass ObjectMapper in constructor of task
* Fix logs
* Fix tests
* Revert log line to debug (#16565)
---------
Co-authored-by: Kashif Faraz <[email protected]>
---
.../batch/parallel/ParallelIndexSupervisorTask.java | 6 ++----
...alGenericSegmentMergeParallelIndexTaskRunner.java | 14 ++------------
.../parallel/PartialGenericSegmentMergeTask.java | 20 ++------------------
.../task/batch/parallel/PartialSegmentMergeTask.java | 14 ++++----------
.../parallel/PartialGenericSegmentMergeTaskTest.java | 9 ++-------
.../metadata/AbstractSegmentMetadataCache.java | 2 +-
.../druid/segment/metadata/FingerprintGenerator.java | 14 ++++++++------
7 files changed, 21 insertions(+), 58 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index e2c0681d001..4ca0f1ff80d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -441,9 +441,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask
ingestionSchema.getDataSchema(),
ioConfigs,
ingestionSchema.getTuningConfig(),
- getContext(),
- toolbox.getJsonMapper(),
- toolbox.getCentralizedTableSchemaConfig()
+ getContext()
);
}
@@ -1620,7 +1618,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask
for (PushedSegmentsReport pushedSegmentsReport :
completedSubtaskReports.values()) {
TaskReport.ReportMap taskReport = pushedSegmentsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
- LOG.warn("Received an empty report from subtask[%s]" +
pushedSegmentsReport.getTaskId());
+ LOG.warn("Received an empty report from sub-task[%s].",
pushedSegmentsReport.getTaskId());
continue;
}
RowIngestionMetersTotals rowIngestionMetersTotals =
getBuildSegmentsStatsFromTaskReport(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java
index 0c743d1f186..8babf50d826 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java
@@ -19,12 +19,10 @@
package org.apache.druid.indexing.common.task.batch.parallel;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import java.util.Iterator;
import java.util.List;
@@ -40,8 +38,6 @@ class PartialGenericSegmentMergeParallelIndexTaskRunner
private final DataSchema dataSchema;
private final List<PartialSegmentMergeIOConfig> mergeIOConfigs;
- private final CentralizedDatasourceSchemaConfig
centralizedDatasourceSchemaConfig;
- private final ObjectMapper mapper;
PartialGenericSegmentMergeParallelIndexTaskRunner(
TaskToolbox toolbox,
@@ -51,17 +47,13 @@ class PartialGenericSegmentMergeParallelIndexTaskRunner
DataSchema dataSchema,
List<PartialSegmentMergeIOConfig> mergeIOConfigs,
ParallelIndexTuningConfig tuningConfig,
- Map<String, Object> context,
- ObjectMapper mapper,
- CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+ Map<String, Object> context
)
{
super(toolbox, taskId, groupId, baseSubtaskSpecName, tuningConfig,
context);
this.dataSchema = dataSchema;
this.mergeIOConfigs = mergeIOConfigs;
- this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
- this.mapper = mapper;
}
@Override
@@ -110,9 +102,7 @@ class PartialGenericSegmentMergeParallelIndexTaskRunner
subtaskSpecId,
numAttempts,
ingestionSpec,
- getContext(),
- centralizedDatasourceSchemaConfig,
- mapper
+ getContext()
);
}
};
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java
index be44fb282ef..989f0a77daa 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java
@@ -19,11 +19,9 @@
package org.apache.druid.indexing.common.task.batch.parallel;
-import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableSet;
@@ -31,7 +29,6 @@ import com.google.common.collect.Table;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
@@ -53,8 +50,6 @@ public class PartialGenericSegmentMergeTask extends
PartialSegmentMergeTask<Buil
private final PartialSegmentMergeIngestionSpec ingestionSchema;
private final Table<Interval, Integer, BuildingShardSpec<?>>
intervalAndIntegerToShardSpec;
- private final CentralizedDatasourceSchemaConfig
centralizedDatasourceSchemaConfig;
-
@JsonCreator
public PartialGenericSegmentMergeTask(
// id shouldn't be null except when this task is created by
ParallelIndexSupervisorTask
@@ -66,9 +61,7 @@ public class PartialGenericSegmentMergeTask extends
PartialSegmentMergeTask<Buil
@JsonProperty("subtaskSpecId") @Nullable final String subtaskSpecId,
@JsonProperty("numAttempts") final int numAttempts, // zero-based
counting
@JsonProperty("spec") final PartialSegmentMergeIngestionSpec
ingestionSchema,
- @JsonProperty("context") final Map<String, Object> context,
- @JsonProperty("centralizedDatasourceSchemaConfig")
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
- @JacksonInject ObjectMapper mapper
+ @JsonProperty("context") final Map<String, Object> context
)
{
super(
@@ -81,12 +74,9 @@ public class PartialGenericSegmentMergeTask extends
PartialSegmentMergeTask<Buil
ingestionSchema.getIOConfig(),
ingestionSchema.getTuningConfig(),
numAttempts,
- context,
- mapper,
- centralizedDatasourceSchemaConfig
+ context
);
- this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
this.ingestionSchema = ingestionSchema;
this.intervalAndIntegerToShardSpec = createIntervalAndIntegerToShardSpec(
ingestionSchema.getIOConfig().getPartitionLocations()
@@ -127,12 +117,6 @@ public class PartialGenericSegmentMergeTask extends
PartialSegmentMergeTask<Buil
return ingestionSchema;
}
- @JsonProperty("centralizedDatasourceSchemaConfig")
- private CentralizedDatasourceSchemaConfig
getCentralizedDatasourceSchemaConfig()
- {
- return centralizedDatasourceSchemaConfig;
- }
-
@Override
public String getType()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
index 7f0208417ff..17d9936c685 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
@@ -20,7 +20,6 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
@@ -84,8 +83,6 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec>
extends PerfectRollu
private final PartialSegmentMergeIOConfig ioConfig;
private final int numAttempts;
private final String subtaskSpecId;
- private final CentralizedDatasourceSchemaConfig
centralizedDatasourceSchemaConfig;
- private final FingerprintGenerator fingerprintGenerator;
PartialSegmentMergeTask(
// id shouldn't be null except when this task is created by
ParallelIndexSupervisorTask
@@ -98,9 +95,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec>
extends PerfectRollu
PartialSegmentMergeIOConfig ioConfig,
ParallelIndexTuningConfig tuningConfig,
final int numAttempts, // zero-based counting
- final Map<String, Object> context,
- final ObjectMapper mapper,
- final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+ final Map<String, Object> context
)
{
super(
@@ -120,8 +115,6 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec>
extends PerfectRollu
this.subtaskSpecId = subtaskSpecId;
this.ioConfig = ioConfig;
this.numAttempts = numAttempts;
- this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
- this.fingerprintGenerator = new FingerprintGenerator(mapper);
}
@JsonProperty
@@ -265,6 +258,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec>
extends PerfectRollu
final Set<DataSegment> pushedSegments = new HashSet<>();
final SegmentSchemaMapping segmentSchemaMapping = new
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
+ final FingerprintGenerator fingerprintGenerator = new
FingerprintGenerator(toolbox.getJsonMapper());
for (Entry<Interval, Int2ObjectMap<List<File>>> entryPerInterval :
intervalToUnzippedFiles.entrySet()) {
final Interval interval = entryPerInterval.getKey();
for (Int2ObjectMap.Entry<List<File>> entryPerBucketId :
entryPerInterval.getValue().int2ObjectEntrySet()) {
@@ -315,7 +309,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec>
extends PerfectRollu
long pushFinishTime = System.nanoTime();
pushedSegments.add(segment);
- if (centralizedDatasourceSchemaConfig.isEnabled()) {
+ if (toolbox.getCentralizedTableSchemaConfig().isEnabled()) {
SchemaPayloadPlus schemaPayloadPlus =
TaskSegmentSchemaUtil.getSegmentSchema(mergedFileAndDimensionNames.lhs,
toolbox.getIndexIO());
segmentSchemaMapping.addSchema(
@@ -341,7 +335,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec>
extends PerfectRollu
);
}
}
- if (centralizedDatasourceSchemaConfig.isEnabled()) {
+ if (toolbox.getCentralizedTableSchemaConfig().isEnabled()) {
LOG.info("SegmentSchema for the pushed segments is [%s]",
segmentSchemaMapping);
}
return new DataSegmentsWithSchemas(pushedSegments,
segmentSchemaMapping.isNonEmpty() ? segmentSchemaMapping : null);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
index d35bba9b4ac..a51856f7353 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
@@ -104,9 +103,7 @@ public class PartialGenericSegmentMergeTaskTest extends
AbstractParallelIndexSup
ParallelIndexTestingFactory.SUBTASK_SPEC_ID,
ParallelIndexTestingFactory.NUM_ATTEMPTS,
ingestionSpec,
- ParallelIndexTestingFactory.CONTEXT,
- CentralizedDatasourceSchemaConfig.create(),
- null
+ ParallelIndexTestingFactory.CONTEXT
);
}
@@ -143,9 +140,7 @@ public class PartialGenericSegmentMergeTaskTest extends
AbstractParallelIndexSup
.partitionsSpec(partitionsSpec)
.build()
),
- ParallelIndexTestingFactory.CONTEXT,
- CentralizedDatasourceSchemaConfig.create(),
- null
+ ParallelIndexTestingFactory.CONTEXT
);
}
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
index 5c38bb7aaaa..9cb2297db82 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
@@ -733,7 +733,7 @@ public abstract class AbstractSegmentMetadataCache<T
extends DataSourceInformati
log.warn("Got analysis for segment [%s] we didn't ask for,
ignoring.", analysis.getId());
} else {
final RowSignature rowSignature = analysisToRowSignature(analysis);
- log.info("Segment[%s] has signature[%s].", segmentId, rowSignature);
+ log.debug("Segment[%s] has signature[%s].", segmentId, rowSignature);
if (segmentMetadataQueryResultHandler(dataSource, segmentId,
rowSignature, analysis)) {
retVal.add(segmentId);
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java
b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java
index ffaccd09bdf..f1cb8ea0a50 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java
@@ -25,6 +25,7 @@ import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
+import org.apache.druid.error.DruidException;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
@@ -71,13 +72,14 @@ public class FingerprintGenerator
}
catch (IOException e) {
log.error(
- "Exception generating fingerprint for payload [%s], datasource [%s],
version [%s] with stacktrace [%s].",
- schemaPayload,
- dataSource,
- version,
- e
+ e,
+ "Exception generating schema fingerprint (version[%d]) for
datasource[%s], payload[%s].",
+ version, dataSource, schemaPayload
+ );
+ throw DruidException.defensive(
+ "Could not generate schema fingerprint (version[%d]) for
datasource[%s].",
+ dataSource, version
);
- throw new RuntimeException(e);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]