jihoonson commented on a change in pull request #9935:
URL: https://github.com/apache/druid/pull/9935#discussion_r446465318
##########
File path:
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
##########
@@ -932,83 +930,100 @@ public int deletePendingSegments(String dataSource)
* Attempts to insert a single segment to the database. If the segment
already exists, will do nothing; although,
* this checking is imperfect and callers must be prepared to retry their
entire transaction on exceptions.
*
- * @return true if the segment was added, false if it already existed
+ * @return DataSegment set inserted
*/
- private boolean announceHistoricalSegment(
+ private Set<DataSegment> announceHistoricalSegmentBatch(
final Handle handle,
- final DataSegment segment,
- final boolean used
+ final Set<DataSegment> segments,
+ final Set<DataSegment> usedSegments
) throws IOException
{
+ final Set<DataSegment> toInsertSegments = new HashSet<>();
try {
- if (segmentExists(handle, segment)) {
- log.info("Found [%s] in DB, not updating DB", segment.getId());
- return false;
+ Set<String> existedSegments = segmentExistsBatch(handle, segments);
+ for (DataSegment segment : segments) {
+ if (existedSegments.contains(segment.getId().toString())) {
+ log.info("Found [%s] in DB, not updating DB", segment.getId());
+ } else {
+ toInsertSegments.add(segment);
+ }
}
// SELECT -> INSERT can fail due to races; callers must be prepared to
retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
// Avoiding try/catch since it may cause inadvertent
transaction-splitting.
- final int numRowsInserted = handle.createStatement(
+ final List<DataSegment> segmentList = new ArrayList<>(toInsertSegments);
+
+ PreparedBatch preparedBatch = handle.prepareBatch(
StringUtils.format(
- "INSERT INTO %1$s (id, dataSource, created_date, start,
%2$send%2$s, partitioned, version, used, "
- + "payload) "
- + "VALUES (:id, :dataSource, :created_date, :start, :end,
:partitioned, :version, :used, :payload)",
+ "INSERT INTO %1$s (id, dataSource, created_date, start,
%2$send%2$s, partitioned, version, used, payload) "
+ + "VALUES (:id, :dataSource, :created_date, :start, :end,
:partitioned, :version, :used, :payload)",
dbTables.getSegmentsTable(),
connector.getQuoteString()
)
- )
+ );
+
+ for (int i = 0; i < segmentList.size(); i++) {
+ DataSegment segment = segmentList.get(i);
+ preparedBatch.add()
.bind("id", segment.getId().toString())
.bind("dataSource", segment.getDataSource())
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", (segment.getShardSpec() instanceof
NoneShardSpec) ? false : true)
.bind("version", segment.getVersion())
- .bind("used", used)
- .bind("payload", jsonMapper.writeValueAsBytes(segment))
- .execute();
-
- if (numRowsInserted == 1) {
- log.info(
- "Published segment [%s] to DB with used flag [%s], json[%s]",
- segment.getId(),
- used,
- jsonMapper.writeValueAsString(segment)
- );
- } else if (numRowsInserted == 0) {
- throw new ISE(
- "Failed to publish segment[%s] to DB with used flag[%s], json[%s]",
- segment.getId(),
- used,
- jsonMapper.writeValueAsString(segment)
- );
- } else {
- throw new ISE(
- "numRowsInserted[%s] is larger than 1 after inserting segment[%s]
with used flag[%s], json[%s]",
- numRowsInserted,
- segment.getId(),
- used,
- jsonMapper.writeValueAsString(segment)
- );
+ .bind("used", usedSegments.contains(segment))
+ .bind("payload", jsonMapper.writeValueAsBytes(segment));
+
+ if ((i + 1) % ANNOUNCE_HISTORICAL_SEGMENG_BATCH == 0 || i ==
segmentList.size() - 1) {
+ int[] affectedRows = preparedBatch.execute();
+ for (int j = 0; j < affectedRows.length; j++) {
+ DataSegment insertSegment = segmentList.get(i /
ANNOUNCE_HISTORICAL_SEGMENG_BATCH * ANNOUNCE_HISTORICAL_SEGMENG_BATCH + j);
+ if (affectedRows[j] == 1) {
+ log.info(
+ "Published segment [%s] to DB with used flag [%s], json[%s]",
+ insertSegment.getId(),
+ usedSegments.contains(insertSegment),
+ jsonMapper.writeValueAsString(insertSegment)
+ );
+ } else {
+ throw new ISE(
+ "Failed to publish segment[%s] to DB with used flag[%s],
json[%s]",
+ insertSegment.getId(),
+ usedSegments.contains(insertSegment),
+ jsonMapper.writeValueAsString(insertSegment)
+ );
+ }
+ }
+ }
}
}
catch (Exception e) {
- log.error(e, "Exception inserting segment [%s] with used flag [%s] into
DB", segment.getId(), used);
+ for (DataSegment segment : segments) {
+ log.error(e, "Exception inserting segment [%s] with used flag [%s]
into DB", segment.getId(), usedSegments.contains(segment));
+ }
Review comment:
Same here. Better to be `log.errorSegments(segments, "Exception
inserting segments");`
##########
File path:
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
##########
@@ -932,83 +930,100 @@ public int deletePendingSegments(String dataSource)
* Attempts to insert a single segment to the database. If the segment
already exists, will do nothing; although,
* this checking is imperfect and callers must be prepared to retry their
entire transaction on exceptions.
*
- * @return true if the segment was added, false if it already existed
+ * @return DataSegment set inserted
*/
- private boolean announceHistoricalSegment(
+ private Set<DataSegment> announceHistoricalSegmentBatch(
final Handle handle,
- final DataSegment segment,
- final boolean used
+ final Set<DataSegment> segments,
+ final Set<DataSegment> usedSegments
) throws IOException
{
+ final Set<DataSegment> toInsertSegments = new HashSet<>();
try {
- if (segmentExists(handle, segment)) {
- log.info("Found [%s] in DB, not updating DB", segment.getId());
- return false;
+ Set<String> existedSegments = segmentExistsBatch(handle, segments);
+ for (DataSegment segment : segments) {
+ if (existedSegments.contains(segment.getId().toString())) {
+ log.info("Found [%s] in DB, not updating DB", segment.getId());
+ } else {
+ toInsertSegments.add(segment);
+ }
}
// SELECT -> INSERT can fail due to races; callers must be prepared to
retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
// Avoiding try/catch since it may cause inadvertent
transaction-splitting.
- final int numRowsInserted = handle.createStatement(
+ final List<DataSegment> segmentList = new ArrayList<>(toInsertSegments);
+
+ PreparedBatch preparedBatch = handle.prepareBatch(
StringUtils.format(
- "INSERT INTO %1$s (id, dataSource, created_date, start,
%2$send%2$s, partitioned, version, used, "
- + "payload) "
- + "VALUES (:id, :dataSource, :created_date, :start, :end,
:partitioned, :version, :used, :payload)",
+ "INSERT INTO %1$s (id, dataSource, created_date, start,
%2$send%2$s, partitioned, version, used, payload) "
+ + "VALUES (:id, :dataSource, :created_date, :start, :end,
:partitioned, :version, :used, :payload)",
dbTables.getSegmentsTable(),
connector.getQuoteString()
)
- )
+ );
+
+ for (int i = 0; i < segmentList.size(); i++) {
+ DataSegment segment = segmentList.get(i);
+ preparedBatch.add()
.bind("id", segment.getId().toString())
.bind("dataSource", segment.getDataSource())
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", (segment.getShardSpec() instanceof
NoneShardSpec) ? false : true)
.bind("version", segment.getVersion())
- .bind("used", used)
- .bind("payload", jsonMapper.writeValueAsBytes(segment))
- .execute();
-
- if (numRowsInserted == 1) {
- log.info(
- "Published segment [%s] to DB with used flag [%s], json[%s]",
- segment.getId(),
- used,
- jsonMapper.writeValueAsString(segment)
- );
- } else if (numRowsInserted == 0) {
- throw new ISE(
- "Failed to publish segment[%s] to DB with used flag[%s], json[%s]",
- segment.getId(),
- used,
- jsonMapper.writeValueAsString(segment)
- );
- } else {
- throw new ISE(
- "numRowsInserted[%s] is larger than 1 after inserting segment[%s]
with used flag[%s], json[%s]",
- numRowsInserted,
- segment.getId(),
- used,
- jsonMapper.writeValueAsString(segment)
- );
+ .bind("used", usedSegments.contains(segment))
+ .bind("payload", jsonMapper.writeValueAsBytes(segment));
+
+ if ((i + 1) % ANNOUNCE_HISTORICAL_SEGMENG_BATCH == 0 || i ==
segmentList.size() - 1) {
+ int[] affectedRows = preparedBatch.execute();
+ for (int j = 0; j < affectedRows.length; j++) {
+ DataSegment insertSegment = segmentList.get(i /
ANNOUNCE_HISTORICAL_SEGMENG_BATCH * ANNOUNCE_HISTORICAL_SEGMENG_BATCH + j);
+ if (affectedRows[j] == 1) {
+ log.info(
+ "Published segment [%s] to DB with used flag [%s], json[%s]",
Review comment:
This will corrupt the logs too. How about modifying as the below?
```java
final List<List<DataSegment>> partitionedSegments = Lists.partition(
new ArrayList<>(toInsertSegments),
MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE
);
PreparedBatch preparedBatch = handle.prepareBatch(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start,
%2$send%2$s, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end,
:partitioned, :version, :used, :payload)",
dbTables.getSegmentsTable(),
connector.getQuoteString()
)
);
for (List<DataSegment> partition : partitionedSegments) {
for (DataSegment segment : partition) {
preparedBatch.add()
.bind("id", segment.getId().toString())
.bind("dataSource", segment.getDataSource())
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start",
segment.getInterval().getStart().toString())
.bind("end",
segment.getInterval().getEnd().toString())
.bind("partitioned", (segment.getShardSpec()
instanceof NoneShardSpec) ? false : true)
.bind("version", segment.getVersion())
.bind("used", usedSegments.contains(segment))
.bind("payload",
jsonMapper.writeValueAsBytes(segment));
}
final int[] affectedRows = preparedBatch.execute();
final boolean succeeded =
Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1);
if (succeeded) {
log.infoSegments(partition, "Published segments to DB");
} else {
final List<DataSegment> failedToPublish = IntStream.range(0,
partition.size())
.filter(i ->
affectedRows[i] != 1)
.mapToObj(partition::get)
.collect(Collectors.toList());
throw new ISE(
"Failed to publish segments to DB: %s",
SegmentUtils.commaSeparatedIdentifiers(failedToPublish)
);
}
}
}
```
##########
File path:
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
##########
@@ -77,12 +79,14 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
/**
*/
public class IndexerSQLMetadataStorageCoordinator implements
IndexerMetadataStorageCoordinator
{
private static final Logger log = new
Logger(IndexerSQLMetadataStorageCoordinator.class);
+ private static final int ANNOUNCE_HISTORICAL_SEGMENG_BATCH = 100;
Review comment:
There is a typo in `ANNOUNCE_HISTORICAL_SEGMENG_BATCH` (SEGMENG). I
would recommend to rename to be more clear such as
`MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE`.
##########
File path:
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
##########
@@ -932,83 +930,100 @@ public int deletePendingSegments(String dataSource)
* Attempts to insert a single segment to the database. If the segment
already exists, will do nothing; although,
* this checking is imperfect and callers must be prepared to retry their
entire transaction on exceptions.
*
- * @return true if the segment was added, false if it already existed
+ * @return DataSegment set inserted
*/
- private boolean announceHistoricalSegment(
+ private Set<DataSegment> announceHistoricalSegmentBatch(
final Handle handle,
- final DataSegment segment,
- final boolean used
+ final Set<DataSegment> segments,
+ final Set<DataSegment> usedSegments
) throws IOException
{
+ final Set<DataSegment> toInsertSegments = new HashSet<>();
try {
- if (segmentExists(handle, segment)) {
- log.info("Found [%s] in DB, not updating DB", segment.getId());
- return false;
+ Set<String> existedSegments = segmentExistsBatch(handle, segments);
+ for (DataSegment segment : segments) {
+ if (existedSegments.contains(segment.getId().toString())) {
+ log.info("Found [%s] in DB, not updating DB", segment.getId());
Review comment:
This will corrupt the overlord logs by printing the similar logs over
again. How about printing them all at once? You can do by doing like this:
```java
Set<String> existedSegments = segmentExistsBatch(handle, segments);
log.info("Found these segments already exist in DB: %s",
existedSegments);
for (DataSegment segment : segments) {
if (!existedSegments.contains(segment.getId().toString())) {
toInsertSegments.add(segment);
}
}
```
##########
File path:
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
##########
@@ -932,83 +930,100 @@ public int deletePendingSegments(String dataSource)
* Attempts to insert a single segment to the database. If the segment
already exists, will do nothing; although,
* this checking is imperfect and callers must be prepared to retry their
entire transaction on exceptions.
*
- * @return true if the segment was added, false if it already existed
+ * @return DataSegment set inserted
*/
- private boolean announceHistoricalSegment(
+ private Set<DataSegment> announceHistoricalSegmentBatch(
final Handle handle,
- final DataSegment segment,
- final boolean used
+ final Set<DataSegment> segments,
+ final Set<DataSegment> usedSegments
) throws IOException
{
+ final Set<DataSegment> toInsertSegments = new HashSet<>();
try {
- if (segmentExists(handle, segment)) {
- log.info("Found [%s] in DB, not updating DB", segment.getId());
- return false;
+ Set<String> existedSegments = segmentExistsBatch(handle, segments);
+ for (DataSegment segment : segments) {
+ if (existedSegments.contains(segment.getId().toString())) {
+ log.info("Found [%s] in DB, not updating DB", segment.getId());
+ } else {
+ toInsertSegments.add(segment);
+ }
}
// SELECT -> INSERT can fail due to races; callers must be prepared to
retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
// Avoiding try/catch since it may cause inadvertent
transaction-splitting.
- final int numRowsInserted = handle.createStatement(
+ final List<DataSegment> segmentList = new ArrayList<>(toInsertSegments);
Review comment:
This will copy the whole set of `toInsertSegments` which doesn't seem
necessary. Can we use `toInsertSegment` instead in the [below for
loop](https://github.com/apache/druid/pull/9935/files#diff-7dab61c46825adfbdc9166d4fb8a30a0R966)?
##########
File path:
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
##########
@@ -77,12 +79,14 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
/**
*/
public class IndexerSQLMetadataStorageCoordinator implements
IndexerMetadataStorageCoordinator
{
private static final Logger log = new
Logger(IndexerSQLMetadataStorageCoordinator.class);
+ private static final int ANNOUNCE_HISTORICAL_SEGMENG_BATCH = 100;
Review comment:
Out of curiosity, how did you come up with this number? It would be nice
to leave a comment if there was some rationale behind it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]