mridulm commented on code in PR #2555:
URL: https://github.com/apache/celeborn/pull/2555#discussion_r1639180914
##########
common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java:
##########
@@ -56,7 +56,7 @@ public long getFileLength() {
return bytesFlushed;
}
- public void updateBytesFlushed(long bytes) {
+ public synchronized void updateBytesFlushed(long bytes) {
bytesFlushed += bytes;
Review Comment:
I am not sure about this change - it is not clear to me why we are locking
on `this` here.
I will revisit this later if you believe this is the right change - not sure
if subclasses of this abstract class need this ... if true, n general I would
expect those kind of patterns to be difficult to evolve.
##########
common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java:
##########
@@ -63,11 +63,11 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
private final AtomicLong timeOfLastRequestNs;
private final long pushTimeoutCheckerInterval;
- private static ScheduledExecutorService pushTimeoutChecker = null;
+ private ScheduledExecutorService pushTimeoutChecker;
private ScheduledFuture pushCheckerScheduleFuture;
private final long fetchTimeoutCheckerInterval;
- private static ScheduledExecutorService fetchTimeoutChecker = null;
+ private ScheduledExecutorService fetchTimeoutChecker;
private ScheduledFuture fetchCheckerScheduleFuture;
Review Comment:
Both of these are expected to be initialized once per class, not per instance
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/WorkerSecretRegistryImpl.java:
##########
@@ -65,12 +65,10 @@ public String load(String appId) {
return pbApplicationMeta.getSecret();
} catch (Throwable e) {
// We catch Throwable here because masterClient.askSync declares
it in its definition.
- // If the secret is null, the authentication will fail so just
logging the exception
- // here.
LOG.error(
"Failed to fetch the application meta info for {} from the
master", appId, e);
+ throw new Exception(e);
Review Comment:
This should be fine, but I will let Chandni comment better.
+CC @otterc
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java:
##########
@@ -96,14 +96,14 @@ public TimeWindow getFetchTimeMetric(long streamId) {
}
}
- public void chunkBeingSent(long streamId) {
+ public synchronized void chunkBeingSent(long streamId) {
StreamState streamState = streams.get(streamId);
if (streamState != null) {
streamState.chunksBeingTransferred++;
}
}
Review Comment:
This will be effectively a global lock, right ?
If the concern is around inconsistent update to `chunksBeingTransferred`
(which is valid), why not lock `streamState` instead ?
```suggestion
public void chunkBeingSent(long streamId) {
StreamState streamState = streams.get(streamId);
if (streamState != null) {
synchronized(streamState) {
streamState.chunksBeingTransferred++;
}
}
}
```
(same below)
##########
client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java:
##########
@@ -51,7 +51,7 @@
public class LocalPartitionReader implements PartitionReader {
private static final Logger logger =
LoggerFactory.getLogger(LocalPartitionReader.class);
- private static volatile ThreadPoolExecutor readLocalShufflePool;
+ private volatile ThreadPoolExecutor readLocalShufflePool;
Review Comment:
This is expected to be a static variable, not instance.
##########
common/src/main/java/org/apache/celeborn/common/network/sasl/anonymous/AnonymousSaslClientFactory.java:
##########
@@ -90,7 +91,7 @@ public byte[] evaluateChallenge(byte[] challenge) throws
SaslException {
throw new IllegalStateException("Authentication has already
completed.");
}
isCompleted = true;
- return ANONYMOUS.getBytes();
+ return ANONYMOUS.getBytes(StandardCharsets.UTF_8);
Review Comment:
nice !
##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -126,13 +125,7 @@ public class ShuffleClientImpl extends ShuffleClient {
private final boolean authEnabled;
private final TransportConf dataTransportConf;
- private final ThreadLocal<Compressor> compressorThreadLocal =
- new ThreadLocal<Compressor>() {
- @Override
- protected Compressor initialValue() {
- return Compressor.getCompressor(conf);
- }
- };
+ private ThreadLocal<Compressor> compressorThreadLocal;
Review Comment:
The change in this class does not look related to error-prone.
Do we want to split it out ?
##########
common/src/test/java/org/apache/celeborn/common/network/sasl/SaslTestBase.java:
##########
@@ -96,9 +96,7 @@ void authHelper(
TimeUnit.MILLISECONDS.sleep(10);
}
}
- if (error != null) {
- throw error;
- }
+ assertNull(error);
Review Comment:
`assertNull` throws NPE - here we are throwing the `error` `Throwable`.
`testReRegisterationFails`, for example, requires the `error` to be thrown.
##########
common/src/test/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManagerSuiteJ.java:
##########
@@ -101,15 +101,7 @@ public void testLoadMissingTrustStore() throws Exception {
assertThrows(
IOException.class,
- () -> {
- ReloadingX509TrustManager tm =
- new ReloadingX509TrustManager(KeyStore.getDefaultType(),
trustStore, "password", 10);
- try {
- tm.init();
- } finally {
- tm.destroy();
- }
- });
+ () -> new ReloadingX509TrustManager(KeyStore.getDefaultType(),
trustStore, "password", 10));
Review Comment:
This is not calling `init`, same below.
##########
common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java:
##########
@@ -200,20 +200,22 @@ public void run() {
} catch (InterruptedException e) {
running = false;
}
- try {
- if (running && needsReload()) {
- try {
- trustManagerRef.set(loadTrustManager());
- this.reloadCount += 1;
- } catch (Exception ex) {
- logger.warn(
- "Could not load truststore (keep using existing one) : " +
ex.toString(), ex);
+ synchronized (this) {
Review Comment:
Why do we need this `synchronized` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]