AmatyaAvadhanula commented on code in PR #15817:
URL: https://github.com/apache/druid/pull/15817#discussion_r1522894697
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java:
##########
@@ -180,7 +191,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws
Exception
org.apache.commons.io.FileUtils.deleteQuietly(persistDir);
FileUtils.mkdirp(persistDir);
- final Set<DataSegment> pushedSegments = mergeAndPushSegments(
+ final Pair<Set<DataSegment>, MinimalSegmentSchemas> pushedSegments =
mergeAndPushSegments(
Review Comment:
Can `SegmentAndSchemas` be moved from multi-stage-query to a core module?
It might be better to use SegmentAndSchemas instead of a Pair of its fields
here
##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1098,6 +1129,146 @@ public DataSegment map(int index, ResultSet r,
StatementContext ctx) throws SQLE
);
}
+ private void doPollSegmentAndSchema()
+ {
+ log.debug("Starting polling of segment and schema table");
+
+ ConcurrentMap<SegmentId, SegmentSchemaCache.SegmentStats> segmentStats =
new ConcurrentHashMap<>();
+
+ // some databases such as PostgreSQL require auto-commit turned off
+ // to stream results back, enabling transactions disables auto-commit
+ //
+ // setting connection to read-only will allow some database such as MySQL
+ // to automatically use read-only transaction mode, further optimizing the
query
+ final List<DataSegment> segments = connector.inReadOnlyTransaction(
+ new TransactionCallback<List<DataSegment>>()
+ {
+ @Override
+ public List<DataSegment> inTransaction(Handle handle,
TransactionStatus status)
+ {
+ return handle
+ .createQuery(StringUtils.format("SELECT payload, schema_id,
num_rows FROM %s WHERE used=true", getSegmentsTable()))
+ .setFetchSize(connector.getStreamingFetchSize())
+ .map(
+ new ResultSetMapper<DataSegment>()
+ {
+ @Override
+ public DataSegment map(int index, ResultSet r,
StatementContext ctx) throws SQLException
+ {
+ try {
+ DataSegment segment =
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+ segmentStats.put(
+ segment.getId(),
+ new SegmentSchemaCache.SegmentStats(
+ (Long) r.getObject("schema_id"),
+ (Long) r.getObject("num_rows")
+ )
+ );
+ return replaceWithExistingSegmentIfPresent(segment);
+ }
+ catch (IOException e) {
+ log.makeAlert(e, "Failed to read segment from
db.").emit();
+ // If one entry in database is corrupted doPoll()
should continue to work overall. See
+ // filter by `Objects::nonNull` below in this method.
+ return null;
+ }
+ }
+ }
+ )
+ .list();
+ }
+ }
+ );
+
+ Map<Long, SchemaPayload> schemaMap = new HashMap<>();
+
+ String schemaPollQuery;
+ if (latestSegmentSchemaPoll == null) {
+ schemaPollQuery = StringUtils.format("SELECT id, payload, created_date
FROM %s", getSegmentSchemaTable());
+ } else {
+ schemaPollQuery = StringUtils.format(
+ "SELECT id, payload, created_date FROM %1$s where created_date >
'%2$s'",
+ getSegmentSchemaTable(),
+ latestSegmentSchemaPoll.toString());
+ }
+ String finalSchemaPollQuery = schemaPollQuery;
+ final DateTime[] maxCreatedDate = {latestSegmentSchemaPoll};
Review Comment:
Why is this an array?
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2829,4 +2972,71 @@ public String toString()
}
}
+ public static class DataSegmentWithSchemaInformation
Review Comment:
This can be private
##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1098,6 +1129,146 @@ public DataSegment map(int index, ResultSet r,
StatementContext ctx) throws SQLE
);
}
+ private void doPollSegmentAndSchema()
+ {
+ log.debug("Starting polling of segment and schema table");
Review Comment:
I think this can be info (To be consistent with changes in
https://github.com/apache/druid/pull/15952)
##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1098,6 +1129,146 @@ public DataSegment map(int index, ResultSet r,
StatementContext ctx) throws SQLE
);
}
+ private void doPollSegmentAndSchema()
+ {
+ log.debug("Starting polling of segment and schema table");
+
+ ConcurrentMap<SegmentId, SegmentSchemaCache.SegmentStats> segmentStats =
new ConcurrentHashMap<>();
+
+ // some databases such as PostgreSQL require auto-commit turned off
+ // to stream results back, enabling transactions disables auto-commit
+ //
+ // setting connection to read-only will allow some database such as MySQL
+ // to automatically use read-only transaction mode, further optimizing the
query
+ final List<DataSegment> segments = connector.inReadOnlyTransaction(
+ new TransactionCallback<List<DataSegment>>()
+ {
+ @Override
+ public List<DataSegment> inTransaction(Handle handle,
TransactionStatus status)
+ {
+ return handle
+ .createQuery(StringUtils.format("SELECT payload, schema_id,
num_rows FROM %s WHERE used=true", getSegmentsTable()))
+ .setFetchSize(connector.getStreamingFetchSize())
+ .map(
+ new ResultSetMapper<DataSegment>()
+ {
+ @Override
+ public DataSegment map(int index, ResultSet r,
StatementContext ctx) throws SQLException
+ {
+ try {
+ DataSegment segment =
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+ segmentStats.put(
+ segment.getId(),
+ new SegmentSchemaCache.SegmentStats(
+ (Long) r.getObject("schema_id"),
+ (Long) r.getObject("num_rows")
+ )
+ );
+ return replaceWithExistingSegmentIfPresent(segment);
+ }
+ catch (IOException e) {
+ log.makeAlert(e, "Failed to read segment from
db.").emit();
+ // If one entry in database is corrupted doPoll()
should continue to work overall. See
+ // filter by `Objects::nonNull` below in this method.
+ return null;
+ }
+ }
+ }
+ )
+ .list();
+ }
+ }
+ );
+
+ Map<Long, SchemaPayload> schemaMap = new HashMap<>();
+
+ String schemaPollQuery;
+ if (latestSegmentSchemaPoll == null) {
+ schemaPollQuery = StringUtils.format("SELECT id, payload, created_date
FROM %s", getSegmentSchemaTable());
+ } else {
+ schemaPollQuery = StringUtils.format(
+ "SELECT id, payload, created_date FROM %1$s where created_date >
'%2$s'",
Review Comment:
Should the created_date be greater than or equal to?
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2245,29 +2376,41 @@ private void insertIntoUpgradeSegmentsTable(
}
}
- private List<DataSegment> retrieveSegmentsById(Handle handle, String
datasource, Set<String> segmentIds)
+ private List<DataSegmentWithSchemaInformation> retrieveSegmentsById(Handle
handle, String datasource, Set<String> segmentIds)
{
if (segmentIds.isEmpty()) {
return Collections.emptyList();
}
return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables,
jsonMapper)
- .retrieveSegmentsById(datasource,
segmentIds)
+ .retrieveSegmentsById(datasource,
segmentIds, centralizedDatasourceSchemaConfig.isEnabled())
.stream()
.map(DataSegmentPlus::getDataSegment)
+ .map(v -> new DataSegmentWithSchemaInformation(v, null, null))
Review Comment:
Why are the numRows and schemaId always null here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]