cryptoe commented on code in PR #15817: URL: https://github.com/apache/druid/pull/15817#discussion_r1565189848
########## server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.guice.LazySingleton; + +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +/** + * Utility to generate fingerprint for an object. + */ +@LazySingleton +public class FingerprintGenerator +{ + private final ObjectMapper objectMapper; + + @Inject + public FingerprintGenerator(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } + + public String generateFingerprint(Object payload) Review Comment: nit : Please add some java docs here. ########## server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.guice.LazySingleton; + +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +/** + * Utility to generate fingerprint for an object. + */ +@LazySingleton +public class FingerprintGenerator +{ + private final ObjectMapper objectMapper; + + @Inject + public FingerprintGenerator(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } + + public String generateFingerprint(Object payload) + { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] serializedObj = objectMapper.writeValueAsBytes(payload); + + digest.update(serializedObj); + byte[] hashBytes = digest.digest(); + return bytesToHex(hashBytes); + } + catch (NoSuchAlgorithmException | IOException e) { + throw new RuntimeException("Error generating object fingerprint. ", e); + } + } + + private String bytesToHex(byte[] bytes) + { + StringBuilder hexString = new StringBuilder(); + for (byte b : bytes) { + String hex = Integer.toHexString(0xff & b); + if (hex.length() == 1) { + hexString.append('0'); Review Comment: Why is this required ? ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -1947,13 +1992,46 @@ public int deletePendingSegments(String dataSource) private Set<DataSegment> announceHistoricalSegmentBatch( final Handle handle, final Set<DataSegment> segments, - final Set<DataSegment> usedSegments + final Set<DataSegment> usedSegments, + @Nullable final MinimalSegmentSchemas minimalSegmentSchemas ) throws IOException { final Set<DataSegment> toInsertSegments = new HashSet<>(); + Map<String, Long> fingerprintSchemaIdMap = null; + boolean schemaPresent = false; try { + if (!publishSchema()) { + log.info("Task schema publish is disabled."); Review Comment: umm Why is this info logging? Isnt; this triggered on each commit ? ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -1947,13 +1992,46 @@ public int deletePendingSegments(String dataSource) private Set<DataSegment> announceHistoricalSegmentBatch( final Handle handle, final Set<DataSegment> segments, - final Set<DataSegment> usedSegments + final Set<DataSegment> usedSegments, + @Nullable final MinimalSegmentSchemas minimalSegmentSchemas ) throws IOException { final Set<DataSegment> toInsertSegments = new HashSet<>(); + Map<String, Long> fingerprintSchemaIdMap = null; + boolean schemaPresent = false; try { + if (!publishSchema()) { + log.info("Task schema publish is disabled."); + } + if (publishSchema() + && minimalSegmentSchemas != null + && minimalSegmentSchemas.isNonEmpty() + ) { + String dataSource = segments.stream().iterator().next().getDataSource(); + schemaPresent = true; + log.info("Persisting segment schema: [%s].", minimalSegmentSchemas); Review Comment: Are you betting that since there would be less schemas, we can get away with info logging ? ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -1291,16 +1323,24 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( } final String dataSource = appendSegments.iterator().next().getDataSource(); - final Set<DataSegment> segmentIdsForNewVersions = connector.retryTransaction( + final Map<SegmentId, Set<DataSegment>> segmentIdsForNewVersions = connector.retryTransaction( (handle, transactionStatus) -> createNewIdsForAppendSegments(handle, dataSource, appendSegments), 0, SQLMetadataConnector.DEFAULT_MAX_TRIES ); + Map<SegmentId, SegmentId> newVersionSegmentToParent = new HashMap<>(); + + for (Map.Entry<SegmentId, Set<DataSegment>> segmentIdToNewVersions : segmentIdsForNewVersions.entrySet()) { + for (DataSegment segment : segmentIdToNewVersions.getValue()) { + newVersionSegmentToParent.put(segment.getId(), segmentIdToNewVersions.getKey()); + } Review Comment: while you are iterating you can also populate the allSegmentsToInsert list (). ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -1291,16 +1323,24 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( } final String dataSource = appendSegments.iterator().next().getDataSource(); - final Set<DataSegment> segmentIdsForNewVersions = connector.retryTransaction( + final Map<SegmentId, Set<DataSegment>> segmentIdsForNewVersions = connector.retryTransaction( Review Comment: Why is this change required ? ########## server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java: ########## @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.inject.Inject; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.apache.druid.segment.SchemaPayload; +import org.apache.druid.segment.SchemaPayloadPlus; +import org.apache.druid.timeline.SegmentId; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; +import org.skife.jdbi.v2.TransactionCallback; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Handles segment schema persistence and cleanup. + */ +@LazySingleton +public class SegmentSchemaManager +{ + private static final EmittingLogger log = new EmittingLogger(SegmentSchemaManager.class); + private static final int DB_ACTION_PARTITION_SIZE = 100; + private final MetadataStorageTablesConfig dbTables; + private final ObjectMapper jsonMapper; + private final SQLMetadataConnector connector; + private final FingerprintGenerator fingerprintGenerator; + + @Inject + public SegmentSchemaManager( + MetadataStorageTablesConfig dbTables, + ObjectMapper jsonMapper, + SQLMetadataConnector connector, + FingerprintGenerator fingerprintGenerator + ) + { + this.dbTables = dbTables; + this.jsonMapper = jsonMapper; + this.connector = connector; + this.fingerprintGenerator = fingerprintGenerator; + } + + public List<Long> identifyReferencedUnusedSchema() + { + return connector.retryWithHandle( + handle -> + handle.createQuery( + StringUtils.format( + "SELECT DISTINCT(schema_id) FROM %s WHERE used = true AND schema_id IN (SELECT id FROM %s WHERE used = false)", + dbTables.getSegmentsTable(), dbTables.getSegmentSchemasTable() + )) + .mapTo(Long.class) + .list() + ); + } + + public int markSchemaUsed(List<Long> schemaIds) + { + String inClause = schemaIds.stream().map(Object::toString).collect(Collectors.joining(",")); + + return connector.retryWithHandle( + handle -> + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = true, used_status_last_updated = :created_date" + + " WHERE id IN (%s)", + dbTables.getSegmentSchemasTable(), inClause + ) + ) + .bind("created_date", DateTimes.nowUtc().toString()) + .execute() + ); + } + + public int deleteSchemasOlderThan(long timestamp) + { + String dateTime = DateTimes.utc(timestamp).toString(); + return connector.retryWithHandle( + handle -> handle.createStatement( + StringUtils.format( + "DELETE FROM %s WHERE used = false AND used_status_last_updated < :date_time", + dbTables.getSegmentSchemasTable() + )) + .bind("date_time", dateTime) + .execute()); + } + + public int identifyAndMarkSchemaUnused() + { + return connector.retryWithHandle( + handle -> + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = false, used_status_last_updated = :created_date WHERE used != false " Review Comment: why is this `created_date`, can we rename this is now. ########## server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java: ########## @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.inject.Inject; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.apache.druid.segment.SchemaPayload; +import org.apache.druid.segment.SchemaPayloadPlus; +import org.apache.druid.timeline.SegmentId; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; +import org.skife.jdbi.v2.TransactionCallback; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Handles segment schema persistence and cleanup. + */ +@LazySingleton +public class SegmentSchemaManager +{ + private static final EmittingLogger log = new EmittingLogger(SegmentSchemaManager.class); + private static final int DB_ACTION_PARTITION_SIZE = 100; + private final MetadataStorageTablesConfig dbTables; + private final ObjectMapper jsonMapper; + private final SQLMetadataConnector connector; + private final FingerprintGenerator fingerprintGenerator; + + @Inject + public SegmentSchemaManager( + MetadataStorageTablesConfig dbTables, + ObjectMapper jsonMapper, + SQLMetadataConnector connector, + FingerprintGenerator fingerprintGenerator + ) + { + this.dbTables = dbTables; + this.jsonMapper = jsonMapper; + this.connector = connector; + this.fingerprintGenerator = fingerprintGenerator; + } + + public List<Long> identifyReferencedUnusedSchema() + { + return connector.retryWithHandle( + handle -> + handle.createQuery( + StringUtils.format( + "SELECT DISTINCT(schema_id) FROM %s WHERE used = true AND schema_id IN (SELECT id FROM %s WHERE used = false)", + dbTables.getSegmentsTable(), dbTables.getSegmentSchemasTable() + )) + .mapTo(Long.class) + .list() + ); + } + + public int markSchemaUsed(List<Long> schemaIds) + { + String inClause = schemaIds.stream().map(Object::toString).collect(Collectors.joining(",")); + + return connector.retryWithHandle( + handle -> + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = true, used_status_last_updated = :created_date" + + " WHERE id IN (%s)", + dbTables.getSegmentSchemasTable(), inClause + ) + ) + .bind("created_date", DateTimes.nowUtc().toString()) + .execute() + ); + } + + public int deleteSchemasOlderThan(long timestamp) + { + String dateTime = DateTimes.utc(timestamp).toString(); + return connector.retryWithHandle( + handle -> handle.createStatement( + StringUtils.format( + "DELETE FROM %s WHERE used = false AND used_status_last_updated < :date_time", + dbTables.getSegmentSchemasTable() + )) + .bind("date_time", dateTime) + .execute()); + } + + public int identifyAndMarkSchemaUnused() + { + return connector.retryWithHandle( + handle -> + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = false, used_status_last_updated = :created_date WHERE used != false " + + "AND id NOT IN (SELECT DISTINCT(schema_id) FROM %s WHERE used=true AND schema_id IS NOT NULL)", + dbTables.getSegmentSchemasTable(), + dbTables.getSegmentsTable() + ) + ) + .bind("created_date", DateTimes.nowUtc().toString()) + .execute()); + } + + public String generateSchemaPayloadFingerprint(SchemaPayload payload) + { + return fingerprintGenerator.generateFingerprint(payload); + } + + /** + * Persist segment schema and update segments in a transaction. + */ + public void persistSchemaAndUpdateSegmentsTable(String dataSource, List<SegmentSchemaMetadataPlus> segmentSchemas, String schemaVersion) + { + connector.retryTransaction((TransactionCallback<Void>) (handle, status) -> { + Map<String, SchemaPayload> schemaPayloadMap = new HashMap<>(); + + for (SegmentSchemaMetadataPlus segmentSchema : segmentSchemas) { + schemaPayloadMap.put( + segmentSchema.getFingerprint(), + segmentSchema.getSegmentSchemaMetadata().getSchemaPayload() + ); + } + persistSegmentSchema(handle, dataSource, schemaPayloadMap, schemaVersion); + updateSegmentWithSchemaInformation(handle, dataSource, schemaVersion, segmentSchemas); + + return null; + }, 1, 3); + } + + /** + * Persist unique segment schema in the DB. + */ + public void persistSegmentSchema( + Handle handle, + String dataSource, + Map<String, SchemaPayload> fingerprintSchemaPayloadMap, + String schemaVersion + ) throws JsonProcessingException + { + try { + // Filter already existing schema + Map<Boolean, Set<String>> existingFingerprintsAndUsedStatus = fingerprintExistBatch(handle, dataSource, schemaVersion, fingerprintSchemaPayloadMap.keySet()); + Set<String> usedExistingFingerprints = existingFingerprintsAndUsedStatus.containsKey(true) ? existingFingerprintsAndUsedStatus.get(true) : new HashSet<>(); + Set<String> unusedExistingFingerprints = existingFingerprintsAndUsedStatus.containsKey(false) ? existingFingerprintsAndUsedStatus.get(false) : new HashSet<>(); + Set<String> existingFingerprints = Sets.union(usedExistingFingerprints, unusedExistingFingerprints); + if (existingFingerprints.size() > 0) { + log.info( + "Found already existing schema in the DB for dataSource [%1$s]. Used fingeprints: [%2$s], Unused fingerprints: [%3$s]", + dataSource, + usedExistingFingerprints, + unusedExistingFingerprints + ); + } + + // There is a possibility of race with schema cleanup Coordinator duty. + // The duty could delete the unused schema. We try to mark them used. + // However, if the duty succeeds in deleting it we just fail the transaction and retry. + // Since the deletion period would be at least > 1h, we are sure that the race wouldn't arise on retry. + // There is another race, wherein used schema could be marked as unused by the cleanup duty. + // The implication is that a segment could reference an unused schema. + // There is no functional problem as such, since the duty would itself mark those schema as used. + if (unusedExistingFingerprints.size() > 0) { + // make the unused schema as used to prevent deletion + String inClause = unusedExistingFingerprints.stream() + .map(value -> "'" + StringEscapeUtils.escapeSql(value) + "'") + .collect(Collectors.joining(",")); + + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = true, used_status_last_updated = :created_date" + + " WHERE datasource = :datasource AND version = :version AND fingerprint IN (%s)", + dbTables.getSegmentSchemasTable(), inClause) + ) + .bind("created_date", DateTimes.nowUtc().toString()) + .bind("datasource", dataSource) + .bind("version", schemaVersion) + .execute(); + + // it is possible that a delete job could have actually deleted these unused schema + // in that scenario fail the transaction + Map<Boolean, Set<String>> unusedFingerprintStatus = fingerprintExistBatch(handle, dataSource, schemaVersion, unusedExistingFingerprints); Review Comment: IMHO this is not required since you are in a transaction lock . ########## server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java: ########## @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.inject.Inject; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.apache.druid.segment.SchemaPayload; +import org.apache.druid.segment.SchemaPayloadPlus; +import org.apache.druid.timeline.SegmentId; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; +import org.skife.jdbi.v2.TransactionCallback; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Handles segment schema persistence and cleanup. + */ +@LazySingleton +public class SegmentSchemaManager +{ + private static final EmittingLogger log = new EmittingLogger(SegmentSchemaManager.class); + private static final int DB_ACTION_PARTITION_SIZE = 100; + private final MetadataStorageTablesConfig dbTables; + private final ObjectMapper jsonMapper; + private final SQLMetadataConnector connector; + private final FingerprintGenerator fingerprintGenerator; + + @Inject + public SegmentSchemaManager( + MetadataStorageTablesConfig dbTables, + ObjectMapper jsonMapper, + SQLMetadataConnector connector, + FingerprintGenerator fingerprintGenerator + ) + { + this.dbTables = dbTables; + this.jsonMapper = jsonMapper; + this.connector = connector; + this.fingerprintGenerator = fingerprintGenerator; + } + + public List<Long> identifyReferencedUnusedSchema() + { + return connector.retryWithHandle( + handle -> + handle.createQuery( + StringUtils.format( + "SELECT DISTINCT(schema_id) FROM %s WHERE used = true AND schema_id IN (SELECT id FROM %s WHERE used = false)", + dbTables.getSegmentsTable(), dbTables.getSegmentSchemasTable() + )) + .mapTo(Long.class) + .list() + ); + } + + public int markSchemaUsed(List<Long> schemaIds) + { + String inClause = schemaIds.stream().map(Object::toString).collect(Collectors.joining(",")); + + return connector.retryWithHandle( + handle -> + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = true, used_status_last_updated = :created_date" + + " WHERE id IN (%s)", + dbTables.getSegmentSchemasTable(), inClause + ) + ) + .bind("created_date", DateTimes.nowUtc().toString()) Review Comment: lets rename this to now. ########## server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java: ########## @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.inject.Inject; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.apache.druid.segment.SchemaPayload; +import org.apache.druid.segment.SchemaPayloadPlus; +import org.apache.druid.timeline.SegmentId; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; +import org.skife.jdbi.v2.TransactionCallback; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Handles segment schema persistence and cleanup. + */ +@LazySingleton +public class SegmentSchemaManager +{ + private static final EmittingLogger log = new EmittingLogger(SegmentSchemaManager.class); + private static final int DB_ACTION_PARTITION_SIZE = 100; + private final MetadataStorageTablesConfig dbTables; + private final ObjectMapper jsonMapper; + private final SQLMetadataConnector connector; + private final FingerprintGenerator fingerprintGenerator; + + @Inject + public SegmentSchemaManager( + MetadataStorageTablesConfig dbTables, + ObjectMapper jsonMapper, + SQLMetadataConnector connector, + FingerprintGenerator fingerprintGenerator + ) + { + this.dbTables = dbTables; + this.jsonMapper = jsonMapper; + this.connector = connector; + this.fingerprintGenerator = fingerprintGenerator; + } + + public List<Long> identifyReferencedUnusedSchema() + { + return connector.retryWithHandle( + handle -> + handle.createQuery( + StringUtils.format( + "SELECT DISTINCT(schema_id) FROM %s WHERE used = true AND schema_id IN (SELECT id FROM %s WHERE used = false)", + dbTables.getSegmentsTable(), dbTables.getSegmentSchemasTable() + )) + .mapTo(Long.class) + .list() + ); + } + + public int markSchemaUsed(List<Long> schemaIds) + { + String inClause = schemaIds.stream().map(Object::toString).collect(Collectors.joining(",")); + + return connector.retryWithHandle( + handle -> + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = true, used_status_last_updated = :created_date" + + " WHERE id IN (%s)", + dbTables.getSegmentSchemasTable(), inClause + ) + ) + .bind("created_date", DateTimes.nowUtc().toString()) + .execute() + ); + } + + public int deleteSchemasOlderThan(long timestamp) + { + String dateTime = DateTimes.utc(timestamp).toString(); + return connector.retryWithHandle( + handle -> handle.createStatement( + StringUtils.format( + "DELETE FROM %s WHERE used = false AND used_status_last_updated < :date_time", + dbTables.getSegmentSchemasTable() + )) + .bind("date_time", dateTime) + .execute()); + } + + public int identifyAndMarkSchemaUnused() + { + return connector.retryWithHandle( + handle -> + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = false, used_status_last_updated = :created_date WHERE used != false " + + "AND id NOT IN (SELECT DISTINCT(schema_id) FROM %s WHERE used=true AND schema_id IS NOT NULL)", + dbTables.getSegmentSchemasTable(), + dbTables.getSegmentsTable() + ) + ) + .bind("created_date", DateTimes.nowUtc().toString()) + .execute()); + } + + public String generateSchemaPayloadFingerprint(SchemaPayload payload) + { + return fingerprintGenerator.generateFingerprint(payload); + } + + /** + * Persist segment schema and update segments in a transaction. + */ + public void persistSchemaAndUpdateSegmentsTable(String dataSource, List<SegmentSchemaMetadataPlus> segmentSchemas, String schemaVersion) + { + connector.retryTransaction((TransactionCallback<Void>) (handle, status) -> { + Map<String, SchemaPayload> schemaPayloadMap = new HashMap<>(); + + for (SegmentSchemaMetadataPlus segmentSchema : segmentSchemas) { + schemaPayloadMap.put( + segmentSchema.getFingerprint(), + segmentSchema.getSegmentSchemaMetadata().getSchemaPayload() + ); + } + persistSegmentSchema(handle, dataSource, schemaPayloadMap, schemaVersion); + updateSegmentWithSchemaInformation(handle, dataSource, schemaVersion, segmentSchemas); + + return null; + }, 1, 3); + } + + /** + * Persist unique segment schema in the DB. + */ + public void persistSegmentSchema( + Handle handle, + String dataSource, + Map<String, SchemaPayload> fingerprintSchemaPayloadMap, + String schemaVersion + ) throws JsonProcessingException + { + try { + // Filter already existing schema + Map<Boolean, Set<String>> existingFingerprintsAndUsedStatus = fingerprintExistBatch(handle, dataSource, schemaVersion, fingerprintSchemaPayloadMap.keySet()); + Set<String> usedExistingFingerprints = existingFingerprintsAndUsedStatus.containsKey(true) ? existingFingerprintsAndUsedStatus.get(true) : new HashSet<>(); + Set<String> unusedExistingFingerprints = existingFingerprintsAndUsedStatus.containsKey(false) ? existingFingerprintsAndUsedStatus.get(false) : new HashSet<>(); + Set<String> existingFingerprints = Sets.union(usedExistingFingerprints, unusedExistingFingerprints); + if (existingFingerprints.size() > 0) { + log.info( Review Comment: I think we should put a metric here rather than info logging. We should adjust this after the patch is merged. ########## server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java: ########## @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.inject.Inject; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.apache.druid.segment.SchemaPayload; +import org.apache.druid.segment.SchemaPayloadPlus; +import org.apache.druid.timeline.SegmentId; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; +import org.skife.jdbi.v2.TransactionCallback; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Handles segment schema persistence and cleanup. + */ +@LazySingleton +public class SegmentSchemaManager +{ + private static final EmittingLogger log = new EmittingLogger(SegmentSchemaManager.class); + private static final int DB_ACTION_PARTITION_SIZE = 100; + private final MetadataStorageTablesConfig dbTables; + private final ObjectMapper jsonMapper; + private final SQLMetadataConnector connector; + private final FingerprintGenerator fingerprintGenerator; + + @Inject + public SegmentSchemaManager( + MetadataStorageTablesConfig dbTables, + ObjectMapper jsonMapper, + SQLMetadataConnector connector, + FingerprintGenerator fingerprintGenerator + ) + { + this.dbTables = dbTables; + this.jsonMapper = jsonMapper; + this.connector = connector; + this.fingerprintGenerator = fingerprintGenerator; + } + + public List<Long> identifyReferencedUnusedSchema() + { + return connector.retryWithHandle( + handle -> + handle.createQuery( + StringUtils.format( + "SELECT DISTINCT(schema_id) FROM %s WHERE used = true AND schema_id IN (SELECT id FROM %s WHERE used = false)", + dbTables.getSegmentsTable(), dbTables.getSegmentSchemasTable() + )) + .mapTo(Long.class) + .list() + ); + } + + public int markSchemaUsed(List<Long> schemaIds) + { + String inClause = schemaIds.stream().map(Object::toString).collect(Collectors.joining(",")); + + return connector.retryWithHandle( + handle -> + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = true, used_status_last_updated = :created_date" + + " WHERE id IN (%s)", + dbTables.getSegmentSchemasTable(), inClause + ) + ) + .bind("created_date", DateTimes.nowUtc().toString()) + .execute() + ); + } + + public int deleteSchemasOlderThan(long timestamp) + { + String dateTime = DateTimes.utc(timestamp).toString(); + return connector.retryWithHandle( + handle -> handle.createStatement( + StringUtils.format( + "DELETE FROM %s WHERE used = false AND used_status_last_updated < :date_time", + dbTables.getSegmentSchemasTable() + )) + .bind("date_time", dateTime) + .execute()); + } + + public int identifyAndMarkSchemaUnused() + { + return connector.retryWithHandle( + handle -> + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = false, used_status_last_updated = :created_date WHERE used != false " + + "AND id NOT IN (SELECT DISTINCT(schema_id) FROM %s WHERE used=true AND schema_id IS NOT NULL)", + dbTables.getSegmentSchemasTable(), + dbTables.getSegmentsTable() + ) + ) + .bind("created_date", DateTimes.nowUtc().toString()) + .execute()); + } + + public String generateSchemaPayloadFingerprint(SchemaPayload payload) + { + return fingerprintGenerator.generateFingerprint(payload); + } + + /** + * Persist segment schema and update segments in a transaction. + */ + public void persistSchemaAndUpdateSegmentsTable(String dataSource, List<SegmentSchemaMetadataPlus> segmentSchemas, String schemaVersion) + { + connector.retryTransaction((TransactionCallback<Void>) (handle, status) -> { + Map<String, SchemaPayload> schemaPayloadMap = new HashMap<>(); + + for (SegmentSchemaMetadataPlus segmentSchema : segmentSchemas) { + schemaPayloadMap.put( + segmentSchema.getFingerprint(), + segmentSchema.getSegmentSchemaMetadata().getSchemaPayload() + ); + } + persistSegmentSchema(handle, dataSource, schemaPayloadMap, schemaVersion); + updateSegmentWithSchemaInformation(handle, dataSource, schemaVersion, segmentSchemas); + + return null; + }, 1, 3); + } + + /** + * Persist unique segment schema in the DB. + */ + public void persistSegmentSchema( + Handle handle, + String dataSource, + Map<String, SchemaPayload> fingerprintSchemaPayloadMap, + String schemaVersion + ) throws JsonProcessingException + { + try { + // Filter already existing schema + Map<Boolean, Set<String>> existingFingerprintsAndUsedStatus = fingerprintExistBatch(handle, dataSource, schemaVersion, fingerprintSchemaPayloadMap.keySet()); + Set<String> usedExistingFingerprints = existingFingerprintsAndUsedStatus.containsKey(true) ? existingFingerprintsAndUsedStatus.get(true) : new HashSet<>(); + Set<String> unusedExistingFingerprints = existingFingerprintsAndUsedStatus.containsKey(false) ? existingFingerprintsAndUsedStatus.get(false) : new HashSet<>(); + Set<String> existingFingerprints = Sets.union(usedExistingFingerprints, unusedExistingFingerprints); + if (existingFingerprints.size() > 0) { + log.info( + "Found already existing schema in the DB for dataSource [%1$s]. Used fingeprints: [%2$s], Unused fingerprints: [%3$s]", + dataSource, + usedExistingFingerprints, + unusedExistingFingerprints + ); + } + + // There is a possibility of race with schema cleanup Coordinator duty. + // The duty could delete the unused schema. We try to mark them used. + // However, if the duty succeeds in deleting it we just fail the transaction and retry. + // Since the deletion period would be at least > 1h, we are sure that the race wouldn't arise on retry. + // There is another race, wherein used schema could be marked as unused by the cleanup duty. + // The implication is that a segment could reference an unused schema. + // There is no functional problem as such, since the duty would itself mark those schema as used. + if (unusedExistingFingerprints.size() > 0) { + // make the unused schema as used to prevent deletion + String inClause = unusedExistingFingerprints.stream() + .map(value -> "'" + StringEscapeUtils.escapeSql(value) + "'") + .collect(Collectors.joining(",")); + + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = true, used_status_last_updated = :created_date" + + " WHERE datasource = :datasource AND version = :version AND fingerprint IN (%s)", + dbTables.getSegmentSchemasTable(), inClause) + ) + .bind("created_date", DateTimes.nowUtc().toString()) + .bind("datasource", dataSource) + .bind("version", schemaVersion) + .execute(); + + // it is possible that a delete job could have actually deleted these unused schema + // in that scenario fail the transaction + Map<Boolean, Set<String>> unusedFingerprintStatus = fingerprintExistBatch(handle, dataSource, schemaVersion, unusedExistingFingerprints); + if (!unusedExistingFingerprints.equals(unusedFingerprintStatus.get(true))) { + throw new ISE("Failed to mark unused schema as used for datasource: [%s], fingerprints: [%s].", dataSource, unusedExistingFingerprints); + } + } + + Map<String, SchemaPayload> schemaPayloadToCreate = new HashMap<>(); + + for (Map.Entry<String, SchemaPayload> entry : fingerprintSchemaPayloadMap.entrySet()) { + if (!existingFingerprints.contains(entry.getKey())) { + schemaPayloadToCreate.put(entry.getKey(), entry.getValue()); + } + } + + if (schemaPayloadToCreate.isEmpty()) { + log.info("No schema to persist for dataSource [%s].", dataSource); + return; + } + + final List<List<String>> partitionedFingerprints = Lists.partition( + new ArrayList<>(schemaPayloadToCreate.keySet()), + DB_ACTION_PARTITION_SIZE + ); + + String insertSql = StringUtils.format( + "INSERT INTO %s (created_date, datasource, fingerprint, payload, used, used_status_last_updated, version) " + + "VALUES (:created_date, :datasource, :fingerprint, :payload, :used, :used_status_last_updated, :version)", + dbTables.getSegmentSchemasTable() + ); + + // insert schemas + PreparedBatch schemaInsertBatch = handle.prepareBatch(insertSql); + for (List<String> partition : partitionedFingerprints) { + for (String fingerprint : partition) { + final String now = DateTimes.nowUtc().toString(); + schemaInsertBatch.add() + .bind("created_date", now) + .bind("datasource", dataSource) + .bind("fingerprint", fingerprint) + .bind("payload", jsonMapper.writeValueAsBytes(fingerprintSchemaPayloadMap.get(fingerprint))) + .bind("used", true) + .bind("used_status_last_updated", now) + .bind("version", schemaVersion); + } + final int[] affectedRows = schemaInsertBatch.execute(); + final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1); + if (succeeded) { + log.info("Published schemas to DB: %s", partition); + } else { + final List<String> failedToPublish = + IntStream.range(0, partition.size()) + .filter(i -> affectedRows[i] != 1) + .mapToObj(partition::get) + .collect(Collectors.toList()); + throw new ISE("Failed to publish schemas to DB: %s", failedToPublish); + } + } + } + catch (Exception e) { + log.error("Exception inserting schemas to DB: %s", fingerprintSchemaPayloadMap); + throw e; + } + } + + /** + * Update segment with schemaId and numRows information. + */ + public void updateSegmentWithSchemaInformation(Handle handle, String dataSource, String schemaVersion, List<SegmentSchemaMetadataPlus> batch) + { + log.debug("Updating segment with schema and numRows information: [%s].", batch); + + // fetch schemaId + Map<String, Long> fingerprintSchemaIdMap = + schemaIdFetchBatch( + handle, + dataSource, + schemaVersion, + batch + .stream() + .map(SegmentSchemaMetadataPlus::getFingerprint) + .collect(Collectors.toSet()) + ); + + log.debug("FingerprintSchemaIdMap: [%s].", fingerprintSchemaIdMap); + + // update schemaId and numRows in segments table + String updateSql = + StringUtils.format( + "UPDATE %s SET schema_id = :schema_id, num_rows = :num_rows WHERE id = :id", + dbTables.getSegmentsTable() + ); + + PreparedBatch segmentUpdateBatch = handle.prepareBatch(updateSql); + + List<List<SegmentSchemaMetadataPlus>> partitionedSegmentIds = + Lists.partition( + batch, + DB_ACTION_PARTITION_SIZE + ); + + for (List<SegmentSchemaMetadataPlus> partition : partitionedSegmentIds) { + for (SegmentSchemaMetadataPlus segmentSchema : batch) { + String fingerprint = segmentSchema.getFingerprint(); + if (!fingerprintSchemaIdMap.containsKey(fingerprint)) { + log.error( + "Fingerprint [%s] for segmentId [%s] and datasource [%s] is not associated with any schemaId.", + fingerprint, dataSource, segmentSchema.getSegmentId() + ); + continue; + } + + segmentUpdateBatch.add() + .bind("id", segmentSchema.getSegmentId().toString()) + .bind("schema_id", fingerprintSchemaIdMap.get(fingerprint)) + .bind("num_rows", segmentSchema.getSegmentSchemaMetadata().getNumRows()); + } + + final int[] affectedRows = segmentUpdateBatch.execute(); + final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1); + + if (succeeded) { + log.info("Updated segments with schemaId & numRows in the DB: %s", partition); + } else { + final List<String> failedToUpdate = + IntStream.range(0, partition.size()) + .filter(i -> affectedRows[i] != 1) + .mapToObj(partition::get) + .map(plus -> plus.getSegmentId().toString()) + .collect(Collectors.toList()); + throw new ISE("Failed to update segments with schema information: %s", failedToUpdate); + } + } + } + + private Map<Boolean, Set<String>> fingerprintExistBatch(Handle handle, String dataSource, String schemaVersion, Set<String> fingerprintsToInsert) + { + List<List<String>> partitionedFingerprints = Lists.partition( + new ArrayList<>(fingerprintsToInsert), + DB_ACTION_PARTITION_SIZE + ); + + Map<Boolean, Set<String>> existingFingerprints = new HashMap<>(); + for (List<String> fingerprintList : partitionedFingerprints) { + String fingerprints = fingerprintList.stream() + .map(fingerprint -> "'" + StringEscapeUtils.escapeSql(fingerprint) + "'") + .collect(Collectors.joining(",")); + handle.createQuery( + StringUtils.format( + "SELECT used, fingerprint FROM %s WHERE datasource = :datasource AND version = :version AND fingerprint IN (%s)", + dbTables.getSegmentSchemasTable(), fingerprints + ) + ) + .bind("datasource", dataSource) + .bind("version", schemaVersion) + .map((index, r, ctx) -> existingFingerprints.computeIfAbsent( + r.getBoolean(1), value -> new HashSet<>()).add(r.getString(2))) Review Comment: Lets add javadoc for the used unused nuance. ########## server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java: ########## @@ -340,6 +346,40 @@ tableName, getPayloadType(), getQuoteString(), getCollation() ); } + public void createSegmentsTableSchemaPersistenceEnabled(final String tableName) + { + createTable( + tableName, + ImmutableList.of( + StringUtils.format( + "CREATE TABLE %1$s (\n" + + " id VARCHAR(255) NOT NULL,\n" + + " dataSource VARCHAR(255) %4$s NOT NULL,\n" + + " created_date VARCHAR(255) NOT NULL,\n" + + " start VARCHAR(255) NOT NULL,\n" + + " %3$send%3$s VARCHAR(255) NOT NULL,\n" + + " partitioned BOOLEAN NOT NULL,\n" + + " version VARCHAR(255) NOT NULL,\n" + + " used BOOLEAN NOT NULL,\n" + + " payload %2$s NOT NULL,\n" + + " used_status_last_updated VARCHAR(255) NOT NULL,\n" + + " schema_id BIGINT,\n" + + " num_rows BIGINT,\n" + + " PRIMARY KEY (id),\n" + + " FOREIGN KEY(schema_id) REFERENCES %5$s(id)\n" Review Comment: Foreign key not required according to the new design I guess. ########## server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java: ########## @@ -340,6 +346,40 @@ tableName, getPayloadType(), getQuoteString(), getCollation() ); } + public void createSegmentsTableSchemaPersistenceEnabled(final String tableName) Review Comment: This should be part of the createSegmentsTableOnly. All the complexity of adding new cols should stay in the string generation stage. This promotes future maintainability of the project. ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -1291,16 +1323,24 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( } final String dataSource = appendSegments.iterator().next().getDataSource(); - final Set<DataSegment> segmentIdsForNewVersions = connector.retryTransaction( + final Map<SegmentId, Set<DataSegment>> segmentIdsForNewVersions = connector.retryTransaction( Review Comment: nit rename parentSegmentIdToNewDataSegment ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -2232,29 +2412,60 @@ private void insertIntoUpgradeSegmentsTable( } } - private List<DataSegment> retrieveSegmentsById(Handle handle, String datasource, Set<String> segmentIds) + private List<DataSegmentWithSchemaInformation> retrieveSegmentsById(Handle handle, String datasource, Set<String> segmentIds) { if (segmentIds.isEmpty()) { return Collections.emptyList(); } - return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) - .retrieveSegmentsById(datasource, segmentIds) - .stream() - .map(DataSegmentPlus::getDataSegment) - .collect(Collectors.toList()); + if (publishSchema()) { + return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) + .retrieveSegmentsWithSchemaById(datasource, segmentIds) + .stream() + .map(plus -> + new DataSegmentWithSchemaInformation( + plus.getDataSegment(), + plus.getSchemaId(), + plus.getNumRows() + ) + ) + .collect(Collectors.toList()); + } else { + return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) + .retrieveSegmentsById(datasource, segmentIds) + .stream() + .map(plus -> + new DataSegmentWithSchemaInformation( + plus.getDataSegment(), + plus.getSchemaId(), + plus.getNumRows() + ) + ) + .collect(Collectors.toList()); + } } private String buildSqlToInsertSegments() { - return StringUtils.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s," - + " partitioned, version, used, payload, used_status_last_updated) " - + "VALUES (:id, :dataSource, :created_date, :start, :end," - + " :partitioned, :version, :used, :payload, :used_status_last_updated)", - dbTables.getSegmentsTable(), - connector.getQuoteString() - ); + if (publishSchema()) { Review Comment: Nit: lets inline the if in the future. ########## server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java: ########## @@ -540,6 +580,50 @@ protected void alterSegmentTableAddUsedFlagLastUpdated() } } + /** + * Adds new columns (used_status_last_updated, schema_id, segment_stats) to the "segments" table. + */ + protected void alterSegmentTableSchemaPersistenceEnabled() + { + final String tableName = tablesConfigSupplier.get().getSegmentsTable(); + + Map<String, String> columnNameTypes = new HashMap<>(); + columnNameTypes.put("used_status_last_updated", "varchar(255)"); + columnNameTypes.put("schema_id", "BIGINT"); + columnNameTypes.put("num_rows", "BIGINT"); + + Set<String> columnsToAdd = new HashSet<>(); + + for (String columnName : columnNameTypes.keySet()) { + if (tableHasColumn(tableName, columnName)) { + log.info("Table[%s] already has column[%s].", tableName, columnName); + } else { + columnsToAdd.add(columnName); + } + } + + List<String> alterCommands = new ArrayList<>(); + if (!columnsToAdd.isEmpty()) { + for (String columnName : columnsToAdd) { + alterCommands.add( Review Comment: It should be one query ########## server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java: ########## @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.inject.Inject; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.apache.druid.segment.SchemaPayload; +import org.apache.druid.segment.SchemaPayloadPlus; +import org.apache.druid.timeline.SegmentId; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; +import org.skife.jdbi.v2.TransactionCallback; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Handles segment schema persistence and cleanup. + */ +@LazySingleton +public class SegmentSchemaManager +{ + private static final EmittingLogger log = new EmittingLogger(SegmentSchemaManager.class); + private static final int DB_ACTION_PARTITION_SIZE = 100; + private final MetadataStorageTablesConfig dbTables; + private final ObjectMapper jsonMapper; + private final SQLMetadataConnector connector; + private final FingerprintGenerator fingerprintGenerator; + + @Inject + public SegmentSchemaManager( + MetadataStorageTablesConfig dbTables, + ObjectMapper jsonMapper, + SQLMetadataConnector connector, + FingerprintGenerator fingerprintGenerator + ) + { + this.dbTables = dbTables; + this.jsonMapper = jsonMapper; + this.connector = connector; + this.fingerprintGenerator = fingerprintGenerator; + } + + public List<Long> identifyReferencedUnusedSchema() + { + return connector.retryWithHandle( + handle -> + handle.createQuery( + StringUtils.format( + "SELECT DISTINCT(schema_id) FROM %s WHERE used = true AND schema_id IN (SELECT id FROM %s WHERE used = false)", + dbTables.getSegmentsTable(), dbTables.getSegmentSchemasTable() + )) + .mapTo(Long.class) + .list() + ); + } + + public int markSchemaUsed(List<Long> schemaIds) + { + String inClause = schemaIds.stream().map(Object::toString).collect(Collectors.joining(",")); + + return connector.retryWithHandle( + handle -> + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = true, used_status_last_updated = :created_date" + + " WHERE id IN (%s)", + dbTables.getSegmentSchemasTable(), inClause + ) + ) + .bind("created_date", DateTimes.nowUtc().toString()) + .execute() + ); + } + + public int deleteSchemasOlderThan(long timestamp) + { + String dateTime = DateTimes.utc(timestamp).toString(); + return connector.retryWithHandle( + handle -> handle.createStatement( + StringUtils.format( + "DELETE FROM %s WHERE used = false AND used_status_last_updated < :date_time", + dbTables.getSegmentSchemasTable() + )) + .bind("date_time", dateTime) + .execute()); + } + + public int identifyAndMarkSchemaUnused() + { + return connector.retryWithHandle( + handle -> + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = false, used_status_last_updated = :created_date WHERE used != false " + + "AND id NOT IN (SELECT DISTINCT(schema_id) FROM %s WHERE used=true AND schema_id IS NOT NULL)", + dbTables.getSegmentSchemasTable(), + dbTables.getSegmentsTable() + ) + ) + .bind("created_date", DateTimes.nowUtc().toString()) + .execute()); + } + + public String generateSchemaPayloadFingerprint(SchemaPayload payload) + { + return fingerprintGenerator.generateFingerprint(payload); + } + + /** + * Persist segment schema and update segments in a transaction. + */ + public void persistSchemaAndUpdateSegmentsTable(String dataSource, List<SegmentSchemaMetadataPlus> segmentSchemas, String schemaVersion) + { + connector.retryTransaction((TransactionCallback<Void>) (handle, status) -> { + Map<String, SchemaPayload> schemaPayloadMap = new HashMap<>(); + + for (SegmentSchemaMetadataPlus segmentSchema : segmentSchemas) { + schemaPayloadMap.put( + segmentSchema.getFingerprint(), + segmentSchema.getSegmentSchemaMetadata().getSchemaPayload() + ); + } + persistSegmentSchema(handle, dataSource, schemaPayloadMap, schemaVersion); + updateSegmentWithSchemaInformation(handle, dataSource, schemaVersion, segmentSchemas); + + return null; + }, 1, 3); + } + + /** + * Persist unique segment schema in the DB. + */ + public void persistSegmentSchema( + Handle handle, + String dataSource, + Map<String, SchemaPayload> fingerprintSchemaPayloadMap, + String schemaVersion + ) throws JsonProcessingException + { + try { + // Filter already existing schema + Map<Boolean, Set<String>> existingFingerprintsAndUsedStatus = fingerprintExistBatch(handle, dataSource, schemaVersion, fingerprintSchemaPayloadMap.keySet()); + Set<String> usedExistingFingerprints = existingFingerprintsAndUsedStatus.containsKey(true) ? existingFingerprintsAndUsedStatus.get(true) : new HashSet<>(); + Set<String> unusedExistingFingerprints = existingFingerprintsAndUsedStatus.containsKey(false) ? existingFingerprintsAndUsedStatus.get(false) : new HashSet<>(); + Set<String> existingFingerprints = Sets.union(usedExistingFingerprints, unusedExistingFingerprints); + if (existingFingerprints.size() > 0) { + log.info( + "Found already existing schema in the DB for dataSource [%1$s]. Used fingeprints: [%2$s], Unused fingerprints: [%3$s]", + dataSource, + usedExistingFingerprints, + unusedExistingFingerprints + ); + } + + // There is a possibility of race with schema cleanup Coordinator duty. + // The duty could delete the unused schema. We try to mark them used. + // However, if the duty succeeds in deleting it we just fail the transaction and retry. Review Comment: ```suggestion // However, if the duty succeeds in deleting , this transaction failed due to consistency guarantees. In that case, we retry. ``` ########## processing/src/main/java/org/apache/druid/segment/MinimalSegmentSchemas.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Compact representation of segment schema for multiple segments. + */ +public class MinimalSegmentSchemas +{ + // Mapping of segmentId to segment level information like schema fingerprint and numRows. + private final Map<String, SegmentStats> segmentIdToMetadataMap; + + // Mapping of schema fingerprint to payload. + private final Map<String, SchemaPayload> schemaFingerprintToPayloadMap; + + private final String schemaVersion; Review Comment: version should be an Int so that we can compare across versions easily. ########## server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java: ########## @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.metadata; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.inject.Inject; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.apache.druid.segment.SchemaPayload; +import org.apache.druid.segment.SchemaPayloadPlus; +import org.apache.druid.timeline.SegmentId; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.PreparedBatch; +import org.skife.jdbi.v2.TransactionCallback; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Handles segment schema persistence and cleanup. + */ +@LazySingleton +public class SegmentSchemaManager +{ + private static final EmittingLogger log = new EmittingLogger(SegmentSchemaManager.class); + private static final int DB_ACTION_PARTITION_SIZE = 100; + private final MetadataStorageTablesConfig dbTables; + private final ObjectMapper jsonMapper; + private final SQLMetadataConnector connector; + private final FingerprintGenerator fingerprintGenerator; + + @Inject + public SegmentSchemaManager( + MetadataStorageTablesConfig dbTables, + ObjectMapper jsonMapper, + SQLMetadataConnector connector, + FingerprintGenerator fingerprintGenerator + ) + { + this.dbTables = dbTables; + this.jsonMapper = jsonMapper; + this.connector = connector; + this.fingerprintGenerator = fingerprintGenerator; + } + + public List<Long> identifyReferencedUnusedSchema() + { + return connector.retryWithHandle( + handle -> + handle.createQuery( + StringUtils.format( + "SELECT DISTINCT(schema_id) FROM %s WHERE used = true AND schema_id IN (SELECT id FROM %s WHERE used = false)", + dbTables.getSegmentsTable(), dbTables.getSegmentSchemasTable() + )) + .mapTo(Long.class) + .list() + ); + } + + public int markSchemaUsed(List<Long> schemaIds) + { + String inClause = schemaIds.stream().map(Object::toString).collect(Collectors.joining(",")); + + return connector.retryWithHandle( + handle -> + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = true, used_status_last_updated = :created_date" + + " WHERE id IN (%s)", + dbTables.getSegmentSchemasTable(), inClause + ) + ) + .bind("created_date", DateTimes.nowUtc().toString()) + .execute() + ); + } + + public int deleteSchemasOlderThan(long timestamp) + { + String dateTime = DateTimes.utc(timestamp).toString(); + return connector.retryWithHandle( + handle -> handle.createStatement( + StringUtils.format( + "DELETE FROM %s WHERE used = false AND used_status_last_updated < :date_time", + dbTables.getSegmentSchemasTable() + )) + .bind("date_time", dateTime) + .execute()); + } + + public int identifyAndMarkSchemaUnused() + { + return connector.retryWithHandle( + handle -> + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = false, used_status_last_updated = :created_date WHERE used != false " + + "AND id NOT IN (SELECT DISTINCT(schema_id) FROM %s WHERE used=true AND schema_id IS NOT NULL)", + dbTables.getSegmentSchemasTable(), + dbTables.getSegmentsTable() + ) + ) + .bind("created_date", DateTimes.nowUtc().toString()) + .execute()); + } + + public String generateSchemaPayloadFingerprint(SchemaPayload payload) + { + return fingerprintGenerator.generateFingerprint(payload); + } + + /** + * Persist segment schema and update segments in a transaction. + */ + public void persistSchemaAndUpdateSegmentsTable(String dataSource, List<SegmentSchemaMetadataPlus> segmentSchemas, String schemaVersion) + { + connector.retryTransaction((TransactionCallback<Void>) (handle, status) -> { + Map<String, SchemaPayload> schemaPayloadMap = new HashMap<>(); + + for (SegmentSchemaMetadataPlus segmentSchema : segmentSchemas) { + schemaPayloadMap.put( + segmentSchema.getFingerprint(), + segmentSchema.getSegmentSchemaMetadata().getSchemaPayload() + ); + } + persistSegmentSchema(handle, dataSource, schemaPayloadMap, schemaVersion); + updateSegmentWithSchemaInformation(handle, dataSource, schemaVersion, segmentSchemas); + + return null; + }, 1, 3); + } + + /** + * Persist unique segment schema in the DB. + */ + public void persistSegmentSchema( + Handle handle, + String dataSource, + Map<String, SchemaPayload> fingerprintSchemaPayloadMap, + String schemaVersion + ) throws JsonProcessingException + { + try { + // Filter already existing schema + Map<Boolean, Set<String>> existingFingerprintsAndUsedStatus = fingerprintExistBatch(handle, dataSource, schemaVersion, fingerprintSchemaPayloadMap.keySet()); + Set<String> usedExistingFingerprints = existingFingerprintsAndUsedStatus.containsKey(true) ? existingFingerprintsAndUsedStatus.get(true) : new HashSet<>(); + Set<String> unusedExistingFingerprints = existingFingerprintsAndUsedStatus.containsKey(false) ? existingFingerprintsAndUsedStatus.get(false) : new HashSet<>(); + Set<String> existingFingerprints = Sets.union(usedExistingFingerprints, unusedExistingFingerprints); + if (existingFingerprints.size() > 0) { + log.info( + "Found already existing schema in the DB for dataSource [%1$s]. Used fingeprints: [%2$s], Unused fingerprints: [%3$s]", + dataSource, + usedExistingFingerprints, + unusedExistingFingerprints + ); + } + + // There is a possibility of race with schema cleanup Coordinator duty. + // The duty could delete the unused schema. We try to mark them used. + // However, if the duty succeeds in deleting it we just fail the transaction and retry. + // Since the deletion period would be at least > 1h, we are sure that the race wouldn't arise on retry. + // There is another race, wherein used schema could be marked as unused by the cleanup duty. + // The implication is that a segment could reference an unused schema. + // There is no functional problem as such, since the duty would itself mark those schema as used. + if (unusedExistingFingerprints.size() > 0) { + // make the unused schema as used to prevent deletion + String inClause = unusedExistingFingerprints.stream() + .map(value -> "'" + StringEscapeUtils.escapeSql(value) + "'") + .collect(Collectors.joining(",")); + + handle.createStatement( + StringUtils.format( + "UPDATE %s SET used = true, used_status_last_updated = :created_date" + + " WHERE datasource = :datasource AND version = :version AND fingerprint IN (%s)", + dbTables.getSegmentSchemasTable(), inClause) + ) + .bind("created_date", DateTimes.nowUtc().toString()) + .bind("datasource", dataSource) + .bind("version", schemaVersion) + .execute(); + + // it is possible that a delete job could have actually deleted these unused schema + // in that scenario fail the transaction + Map<Boolean, Set<String>> unusedFingerprintStatus = fingerprintExistBatch(handle, dataSource, schemaVersion, unusedExistingFingerprints); + if (!unusedExistingFingerprints.equals(unusedFingerprintStatus.get(true))) { + throw new ISE("Failed to mark unused schema as used for datasource: [%s], fingerprints: [%s].", dataSource, unusedExistingFingerprints); + } + } + + Map<String, SchemaPayload> schemaPayloadToCreate = new HashMap<>(); + + for (Map.Entry<String, SchemaPayload> entry : fingerprintSchemaPayloadMap.entrySet()) { + if (!existingFingerprints.contains(entry.getKey())) { + schemaPayloadToCreate.put(entry.getKey(), entry.getValue()); + } + } + + if (schemaPayloadToCreate.isEmpty()) { + log.info("No schema to persist for dataSource [%s].", dataSource); + return; + } + + final List<List<String>> partitionedFingerprints = Lists.partition( + new ArrayList<>(schemaPayloadToCreate.keySet()), + DB_ACTION_PARTITION_SIZE + ); + + String insertSql = StringUtils.format( Review Comment: Also we should not publish schema if schemaVersion mismatch is there. ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -1947,13 +1992,46 @@ public int deletePendingSegments(String dataSource) private Set<DataSegment> announceHistoricalSegmentBatch( final Handle handle, final Set<DataSegment> segments, - final Set<DataSegment> usedSegments + final Set<DataSegment> usedSegments, + @Nullable final MinimalSegmentSchemas minimalSegmentSchemas ) throws IOException { final Set<DataSegment> toInsertSegments = new HashSet<>(); + Map<String, Long> fingerprintSchemaIdMap = null; + boolean schemaPresent = false; try { + if (!publishSchema()) { + log.info("Task schema publish is disabled."); + } + if (publishSchema() + && minimalSegmentSchemas != null + && minimalSegmentSchemas.isNonEmpty() + ) { + String dataSource = segments.stream().iterator().next().getDataSource(); + schemaPresent = true; + log.info("Persisting segment schema: [%s].", minimalSegmentSchemas); + segmentSchemaManager.persistSegmentSchema( + handle, + dataSource, + minimalSegmentSchemas.getSchemaFingerprintToPayloadMap(), + minimalSegmentSchemas.getSchemaVersion() + ); + + // fetch schemaId + fingerprintSchemaIdMap = Review Comment: Ideally we should just fetch the created schemas and not all the schemas again. Please make this change in a follow up PR. ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -2128,9 +2230,51 @@ private void verifySegmentsToCommit(Collection<DataSegment> segments) * Callers of this method might need to retry as INSERT followed by SELECT * might fail due to race conditions. */ - private Set<DataSegment> insertSegments(Handle handle, Set<DataSegment> segments) - throws IOException + private Set<DataSegment> insertSegments( + Handle handle, + Set<DataSegment> segments, + @Nullable MinimalSegmentSchemas minimalSegmentSchemas, + Set<DataSegmentWithSchemaInformation> appendAfterReplaceSegmentMetadata, + Map<SegmentId, SegmentId> newVersionForAppendToParent + ) throws IOException { + Map<String, Long> fingerprintSchemaIdMap = null; + boolean schemaPresent = false; + + if (!publishSchema()) { + log.info("Task schema publish is disabled."); + } + if (publishSchema() && minimalSegmentSchemas != null && minimalSegmentSchemas.isNonEmpty()) { + String dataSource = segments.iterator().next().getDataSource(); + schemaPresent = true; Review Comment: This all should be refactored to one method. ########## server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java: ########## @@ -540,6 +580,50 @@ protected void alterSegmentTableAddUsedFlagLastUpdated() } } + /** + * Adds new columns (used_status_last_updated, schema_id, segment_stats) to the "segments" table. + */ + protected void alterSegmentTableSchemaPersistenceEnabled() + { + final String tableName = tablesConfigSupplier.get().getSegmentsTable(); + + Map<String, String> columnNameTypes = new HashMap<>(); + columnNameTypes.put("used_status_last_updated", "varchar(255)"); + columnNameTypes.put("schema_id", "BIGINT"); + columnNameTypes.put("num_rows", "BIGINT"); + + Set<String> columnsToAdd = new HashSet<>(); + + for (String columnName : columnNameTypes.keySet()) { + if (tableHasColumn(tableName, columnName)) { + log.info("Table[%s] already has column[%s].", tableName, columnName); + } else { + columnsToAdd.add(columnName); + } + } + + List<String> alterCommands = new ArrayList<>(); + if (!columnsToAdd.isEmpty()) { + for (String columnName : columnsToAdd) { + alterCommands.add( Review Comment: Why are there multiple alter commands ? ########## server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java: ########## @@ -906,6 +996,40 @@ public Void withHandle(Handle handle) } } + public void createSegmentSchemaTable(final String tableName) + { + createTable( + tableName, + ImmutableList.of( + StringUtils.format( + "CREATE TABLE %1$s (\n" + + " id %2$s NOT NULL,\n" + + " created_date VARCHAR(255) NOT NULL,\n" + + " datasource VARCHAR(255) NOT NULL,\n" + + " fingerprint VARCHAR(255) NOT NULL,\n" + + " payload %3$s NOT NULL,\n" + + " used BOOLEAN NOT NULL,\n" + + " used_status_last_updated VARCHAR(255) NOT NULL,\n" + + " version VARCHAR(255) NOT NULL,\n" Review Comment: This should be int ########## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ########## @@ -2147,17 +2291,53 @@ private Set<DataSegment> insertSegments(Handle handle, Set<DataSegment> segments final PreparedBatch batch = handle.prepareBatch(buildSqlToInsertSegments()); for (List<DataSegment> partition : partitionedSegments) { for (DataSegment segment : partition) { - batch.add() - .bind("id", segment.getId().toString()) - .bind("dataSource", segment.getDataSource()) - .bind("created_date", now) - .bind("start", segment.getInterval().getStart().toString()) - .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) - .bind("version", segment.getVersion()) - .bind("used", true) - .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .bind("used_status_last_updated", now); + String segmentId = segment.getId().toString(); + Long schemaId = null; + Long numRows = null; + + if (schemaPresent && + (minimalSegmentSchemas.getSegmentIdToMetadataMap().containsKey(segmentId) || + (newVersionForAppendToParent.containsKey(segment.getId()) + && minimalSegmentSchemas.getSegmentIdToMetadataMap() + .containsKey(newVersionForAppendToParent.get(segment.getId()).toString())))) { + String segmentIdToUse; + if (minimalSegmentSchemas.getSegmentIdToMetadataMap().containsKey(segmentId)) { + segmentIdToUse = segmentId; + } else { + segmentIdToUse = newVersionForAppendToParent.get(segment.getId()).toString(); Review Comment: This can be refactored in one method. ########## server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java: ########## @@ -906,6 +996,40 @@ public Void withHandle(Handle handle) } } + public void createSegmentSchemaTable(final String tableName) + { + createTable( + tableName, + ImmutableList.of( + StringUtils.format( + "CREATE TABLE %1$s (\n" + + " id %2$s NOT NULL,\n" + + " created_date VARCHAR(255) NOT NULL,\n" Review Comment: @kfaraz Create date is varchar? Should we change it to dataTIme ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
