jihoonson commented on a change in pull request #11536:
URL: https://github.com/apache/druid/pull/11536#discussion_r683867733
##########
File path:
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
##########
@@ -177,9 +193,53 @@ public Object startJob()
{
tuningConfig.getBasePersistDirectory().mkdirs();
lockBasePersistDirectory();
+ initializeExecutors();
return null;
}
+ private void throwPersistErrorIfExists()
+ {
+ if (persistError != null) {
+ throw new RE(persistError, "Error while persisting");
+ }
+ }
+
+ private void initializeExecutors()
+ {
+ log.info("There will be up to[%d] pending persists", maxPendingPersists);
Review comment:
How could this be useful except for debugging? If this is only useful
for debugging, it should be not info.
##########
File path:
server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
##########
@@ -913,6 +939,10 @@ static InputRow createInputRow(String ts, String dim,
Object met)
);
}
+ private void waitForPersists() throws InterruptedException
+ {
+ Thread.sleep(500);
Review comment:
Sleeps are bad. These will make the unit testing slower. Also, I bet all
the sleeps you added will make these tests quite flaky.
##########
File path:
server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
##########
@@ -806,7 +831,8 @@ public void testTotalRowCount() throws Exception
appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "bob", 1),
null);
Assert.assertEquals(4, appenderator.getTotalRowCount());
- appenderator.persistAll(null).get();
+ appenderator.persistAll(null);
+ waitForPersists();
Review comment:
Why not `persistAll(null).get()`?
##########
File path:
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
##########
@@ -110,20 +113,28 @@
private final ObjectMapper objectMapper;
private final IndexIO indexIO;
private final IndexMerger indexMerger;
- private final Map<SegmentIdWithShardSpec, Sink> sinks = new HashMap<>();
private final long maxBytesTuningConfig;
private final boolean skipBytesInMemoryOverheadCheck;
+ private volatile ListeningExecutorService persistExecutor = null;
+ private volatile ListeningExecutorService pushExecutor = null;
+ private final int maxPendingPersists;
+ private static final int DEFAULT_PENDING_PERSISTS = 2;
+ private static final int PERSIST_WARN_DELAY = 1000;
+ private volatile Throwable persistError;
+
+
+ private final ConcurrentHashMap<SegmentIdWithShardSpec, Sink> sinks = new
ConcurrentHashMap<>();
/**
* The following sinks metadata map and associated class are the way to
retain metadata now that sinks
* are being completely removed from memory after each incremental persist.
*/
- private final Map<SegmentIdWithShardSpec, SinkMetadata> sinksMetadata = new
HashMap<>();
+ private final ConcurrentHashMap<SegmentIdWithShardSpec, SinkMetadata>
sinksMetadata = new ConcurrentHashMap<>();
Review comment:
Please document details of the concurrent access pattern.
##########
File path:
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
##########
@@ -110,20 +113,28 @@
private final ObjectMapper objectMapper;
private final IndexIO indexIO;
private final IndexMerger indexMerger;
- private final Map<SegmentIdWithShardSpec, Sink> sinks = new HashMap<>();
private final long maxBytesTuningConfig;
private final boolean skipBytesInMemoryOverheadCheck;
+ private volatile ListeningExecutorService persistExecutor = null;
+ private volatile ListeningExecutorService pushExecutor = null;
+ private final int maxPendingPersists;
+ private static final int DEFAULT_PENDING_PERSISTS = 2;
+ private static final int PERSIST_WARN_DELAY = 1000;
+ private volatile Throwable persistError;
+
+
+ private final ConcurrentHashMap<SegmentIdWithShardSpec, Sink> sinks = new
ConcurrentHashMap<>();
/**
* The following sinks metadata map and associated class are the way to
retain metadata now that sinks
* are being completely removed from memory after each incremental persist.
*/
- private final Map<SegmentIdWithShardSpec, SinkMetadata> sinksMetadata = new
HashMap<>();
+ private final ConcurrentHashMap<SegmentIdWithShardSpec, SinkMetadata>
sinksMetadata = new ConcurrentHashMap<>();
// This variable updated in add(), persist(), and drop()
- private int rowsCurrentlyInMemory = 0;
+ private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
Review comment:
What threads can access this and `bytesCurrentlyInMemory` concurrently?
If there are any, please document details of the concurrent access pattern.
##########
File path:
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
##########
@@ -158,6 +169,11 @@
maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
skipBytesInMemoryOverheadCheck =
tuningConfig.isSkipBytesInMemoryOverheadCheck();
+ if (tuningConfig.getMaxPendingPersists() < 1) {
+ maxPendingPersists = DEFAULT_PENDING_PERSISTS;
Review comment:
What is the rationale for the default of 2? The previous default was 0
which is infinite. I don't think we ever need to change this in production. The
doc for `maxPendingPersists` was not updated in
https://github.com/apache/druid/pull/11294, so whatever we change here, we
should fix the doc too.
##########
File path:
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
##########
@@ -110,20 +113,28 @@
private final ObjectMapper objectMapper;
private final IndexIO indexIO;
private final IndexMerger indexMerger;
- private final Map<SegmentIdWithShardSpec, Sink> sinks = new HashMap<>();
private final long maxBytesTuningConfig;
private final boolean skipBytesInMemoryOverheadCheck;
+ private volatile ListeningExecutorService persistExecutor = null;
+ private volatile ListeningExecutorService pushExecutor = null;
+ private final int maxPendingPersists;
+ private static final int DEFAULT_PENDING_PERSISTS = 2;
+ private static final int PERSIST_WARN_DELAY = 1000;
+ private volatile Throwable persistError;
+
+
+ private final ConcurrentHashMap<SegmentIdWithShardSpec, Sink> sinks = new
ConcurrentHashMap<>();
Review comment:
Can multiple threads access this map at the same time? I don't see any
unless I'm missing something. If there are any, please document details of the
concurrent access pattern. It helps people a lot including reviewers, other
developers, and your future-self to understand and remember how things work.
Also please check out
https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md. We
have reasons for including it in the PR template. I would highly suggest
reading it and marking the concurrency self-review item in your PR checklist.
##########
File path:
server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTest.java
##########
@@ -382,12 +387,14 @@ public void
testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadChec
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1),
null);
+ waitForPersists();
Review comment:
I think you could keep the future of persist triggered in `add()` in a
variable. Then you can add a method used only for testing that returns the
persist future. Then you can finally wait for the future to be done instead of
sleeping.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]