cryptoe commented on code in PR #15817:
URL: https://github.com/apache/druid/pull/15817#discussion_r1574138422
##########
services/src/main/java/org/apache/druid/cli/CliPeon.java:
##########
@@ -220,31 +217,10 @@ protected List<? extends Module> getModules()
@Override
public void configure(Binder binder)
{
+
Review Comment:
MM less ingestion would need this check.
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -330,11 +330,13 @@ SegmentPublishResult commitSegmentsAndMetadata(
* @param appendSegmentToReplaceLock Map from append segment to the currently
* active REPLACE lock (if any) covering it
* @param taskAllocatorId allocator id of the task committing the
segments to be appended
+ * @param segmentSchemaMapping schema of append segments
*/
SegmentPublishResult commitAppendSegments(
Set<DataSegment> appendSegments,
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
- @Nullable MinimalSegmentSchemas minimalSegmentSchemas
+ String taskAllocatorId,
Review Comment:
Why is this required ?
There are no javadocs for this.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java:
##########
@@ -192,7 +193,8 @@ public Pair<Integer, ReadableInput> apply(ReadableInput
readableInput)
frameContext.indexMerger(),
meters,
parseExceptionHandler,
- true
+ true,
+ CentralizedDatasourceSchemaConfig.create(false)
Review Comment:
MSQ does not support centralized data source schema yet. I think we should
put his comment here.
##########
processing/src/test/java/org/apache/druid/segment/column/SegmentSchemaMappingTest.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.SegmentMetadata;
+import org.apache.druid.segment.SegmentSchemaMapping;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.timeline.SegmentId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class SegmentSchemaMappingTest
+{
+ static {
+ NullHandling.initializeForTests();
+ }
+
+ private ObjectMapper mapper = TestHelper.makeJsonMapper();
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ RowSignature rowSignature = RowSignature.builder().add("c",
ColumnType.FLOAT).build();
Review Comment:
The correct test case of a serde is to use a json string and then try to
de-serialize it to an object and see if we break backward compatibility if a
code change is there. The current UT will always pass :).
##########
services/src/main/java/org/apache/druid/cli/ServerRunnable.java:
##########
@@ -197,4 +204,31 @@ public void stop()
return new Child();
}
}
+
+ protected void validateCentralizedDatasourceSchemaConfig(Properties
properties)
Review Comment:
This can be static as well.
##########
server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java:
##########
@@ -45,18 +51,27 @@ public FingerprintGenerator(ObjectMapper objectMapper)
/**
* Generates fingerprint or hash string for an object using SHA-256 hash
algorithm.
*/
- public String generateFingerprint(Object payload)
+ @SuppressWarnings("UnstableApiUsage")
+ public String generateFingerprint(SchemaPayload schemaPayload, String
dataSource, int version)
Review Comment:
Dependency update might cause issues.
##########
server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java:
##########
@@ -45,18 +51,27 @@ public FingerprintGenerator(ObjectMapper objectMapper)
/**
* Generates fingerprint or hash string for an object using SHA-256 hash
algorithm.
*/
- public String generateFingerprint(Object payload)
+ @SuppressWarnings("UnstableApiUsage")
+ public String generateFingerprint(SchemaPayload schemaPayload, String
dataSource, int version)
Review Comment:
This should have UT's so that we can assert the changes.
##########
processing/src/test/java/org/apache/druid/segment/column/SegmentSchemaMappingTest.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.SegmentMetadata;
+import org.apache.druid.segment.SegmentSchemaMapping;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.timeline.SegmentId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class SegmentSchemaMappingTest
+{
+ static {
+ NullHandling.initializeForTests();
+ }
+
+ private ObjectMapper mapper = TestHelper.makeJsonMapper();
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ RowSignature rowSignature = RowSignature.builder().add("c",
ColumnType.FLOAT).build();
Review Comment:
Please mark this as a followup for the PR.
##########
docs/operations/metrics.md:
##########
@@ -75,6 +75,12 @@ Most metric values reset each emission period, as specified
in `druid.monitoring
|`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch
datasource schema.||
|`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch
datasource schema.||
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch
datasource schema.||
+|`metadatacache/backfill/count`|Number of segments for which schema was back
filled in the database.|`dataSource`|
+|`schemacache/realtime/size`|Number of realtime segments for which schema is
cached.||Depends on the number of realtime segments.|
+|`schemacache/finalizedSegmentMetadata/size`|Number of finalized segments for
which schema metadata is cached.||Depends on the number of segments in the
cluster.|
Review Comment:
Nit: this should be count and not size. We can do this change as a followup.
##########
server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java:
##########
@@ -175,7 +175,7 @@ public void stop()
public void leaderStart()
{
- log.info("%s starting cache initialization.", getClass().getSimpleName());
+ log.info("Initializing cache.");
Review Comment:
I would recommend adding the name of the class so I can search
cache %s in the logs :). It makes it easier for me to search thought tons of
logs. since each loggers can have its own format.
Nit: can be done in a followup.
##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java:
##########
@@ -123,12 +125,17 @@ public void uninitialize()
log.info("[%s] is uninitializing.", getClass().getSimpleName());
initialized.set(new CountDownLatch(1));
- finalizedSegmentStats = ImmutableMap.of();
- finalizedSegmentSchema.clear();
+ finalizedSegmentMetadata = ImmutableMap.of();
+ finalizedSegmentSchema = ImmutableMap.of();
Review Comment:
Can this cause threadSafety issues if we change the reference ?
##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java:
##########
@@ -106,10 +108,10 @@ public SegmentSchemaCache(ServiceEmitter emitter)
public void setInitialized()
{
- log.info("[%s] initializing.", getClass().getSimpleName());
+ log.info("Initializing SegmentSchemaCache.");
Review Comment:
This should have isInitalized() on the top no ?
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1990,84 +1868,52 @@ public int deletePendingSegments(String dataSource)
);
}
- private boolean isSchemaPresent(MinimalSegmentSchemas minimalSegmentSchemas)
+ private boolean shouldPersistSchema(SegmentSchemaMapping
segmentSchemaMapping)
{
- return publishSchema()
- && minimalSegmentSchemas != null
- && minimalSegmentSchemas.isNonEmpty();
+ return schemaPersistEnabled
+ && segmentSchemaMapping != null
+ && segmentSchemaMapping.isNonEmpty();
}
- private Map<String, Long> persistSchema(
+ private void persistSchema(
final Handle handle,
final Set<DataSegment> segments,
- final MinimalSegmentSchemas minimalSegmentSchemas
+ final SegmentSchemaMapping segmentSchemaMapping
) throws JsonProcessingException
{
- if (minimalSegmentSchemas.getSchemaVersion() !=
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) {
+ if (segmentSchemaMapping.getSchemaVersion() !=
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) {
log.error(
- "Schema version [%d] doesn't match the current version [%d],
dropping the schema [%s].",
- minimalSegmentSchemas.getSchemaVersion(),
+ "Schema version [%d] doesn't match the current version [%d]. Not
persisting this schema [%s]. "
+ + "Schema for this segment will be poppulated by the schema backfill
job in Coordinator.",
Review Comment:
```suggestion
+ "Schema for this segment will be populated by the schema
back-fill job in Coordinator.",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]