rmcyang commented on code in PR #2105:
URL: 
https://github.com/apache/incubator-celeborn/pull/2105#discussion_r1399693569


##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4053,4 +4058,12 @@ object CelebornConf extends Logging {
       .doc("Kerberos keytab file path for HDFS storage connection.")
       .stringConf
       .createOptional
+

Review Comment:
   Spark use configs `NETWORK_CRYPTO_ENABLED` and `SASL_ENCRYPTION_ENABLED` to 
determine the encryption. Are we going to introduce similar config(s) for 
Celeborn?



##########
common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java:
##########
@@ -250,8 +258,29 @@ public void initChannel(SocketChannel ch) {
     }
 
     TransportClient client = clientRef.get();
+    Channel channel = channelRef.get();
     assert client != null : "Channel future completed successfully with null 
client";
 
+    // Execute any client bootstraps synchronously before marking the Client 
as successful.
+    long preBootstrap = System.nanoTime();
+    logger.debug("Connection to {} successful, running bootstraps...", 
address);
+    try {
+      for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
+        clientBootstrap.doBootstrap(client, channel);
+      }
+    } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap 
may be written in Scala
+      long bootstrapTimeMs = Duration.ofNanos(System.nanoTime() - 
preBootstrap).toMillis();
+      logger.error("Exception while bootstrapping client after " + 
bootstrapTimeMs + " ms", e);
+      client.close();
+      throw Throwables.propagate(e);
+    }
+    long postBootstrap = System.nanoTime();
+    logger.info(
+        "Successfully created connection to {} after {} ms ({} ms spent in 
bootstraps)",
+        address,
+        (postBootstrap - preConnect) / 1000000,
+        (postBootstrap - preBootstrap) / 1000000);

Review Comment:
   we can also use `Duration.ofNanos(...).toMillis()` which used by L272 for 
this Millis conversion?



##########
common/src/main/java/org/apache/celeborn/common/network/sasl/CelebornSaslClient.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.sasl;
+
+import static org.apache.celeborn.common.network.sasl.CelebornSaslServer.*;
+import static org.apache.celeborn.common.network.sasl.SaslConstants.*;
+
+import java.util.Map;
+
+import javax.annotation.Nullable;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.network.sasl.anonymous.AnonymousSaslProvider;
+
+/**
+ * A SASL Client for Celeborn which simply keeps track of the state of a 
single SASL session, from
+ * the initial state to the "authenticated" state. This client initializes the 
protocol via a
+ * firstToken, which is then followed by a set of challenges and responses.
+ */
+public class CelebornSaslClient {
+  private static final Logger logger = 
LoggerFactory.getLogger(CelebornSaslClient.class);
+
+  private SaslClient saslClient;
+
+  public CelebornSaslClient(
+      String saslMechanism,
+      @Nullable Map<String, String> saslProps,
+      @Nullable CallbackHandler authCallbackHandler) {
+    AnonymousSaslProvider.initializeIfNeeded();
+    Preconditions.checkNotNull(saslMechanism);
+    try {
+      this.saslClient =
+          Sasl.createSaslClient(
+              new String[] {saslMechanism},
+              null,
+              null,
+              DEFAULT_REALM,
+              saslProps,
+              authCallbackHandler);
+    } catch (SaslException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  /** Used to initiate SASL handshake with server. */
+  public synchronized byte[] firstToken() {
+    if (saslClient != null && saslClient.hasInitialResponse()) {
+      try {
+        return saslClient.evaluateChallenge(new byte[0]);
+      } catch (SaslException e) {
+        throw Throwables.propagate(e);
+      }
+    } else {
+      return new byte[0];
+    }
+  }
+
+  /** Determines whether the authentication exchange has completed. */
+  public synchronized boolean isComplete() {
+    return saslClient != null && saslClient.isComplete();
+  }
+
+  /** Returns the value of a negotiated property. */
+  public Object getNegotiatedProperty(String name) {
+    return saslClient.getNegotiatedProperty(name);
+  }
+
+  /**
+   * Respond to server's SASL token.
+   *
+   * @param token contains server's SASL token
+   * @return client's response SASL token
+   */
+  public synchronized byte[] response(byte[] token) {
+    try {
+      return saslClient != null ? saslClient.evaluateChallenge(token) : new 
byte[0];
+    } catch (SaslException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  /**
+   * Disposes of any system resources or security-sensitive information the 
SaslClient might be
+   * using.
+   */
+  public synchronized void dispose() {
+    if (saslClient != null) {
+      try {
+        saslClient.dispose();
+      } catch (SaslException e) {
+        // ignore
+      } finally {
+        saslClient = null;
+      }
+    }
+  }
+
+  /**
+   * Implementation of javax.security.auth.callback.CallbackHandler that works 
with share secrets.
+   */
+  public static class ClientCallbackHandler implements CallbackHandler {
+
+    private final String id;
+    private final String password;
+
+    public ClientCallbackHandler(String id, String password) {
+      this.id = Preconditions.checkNotNull(id, "id");
+      this.password = Preconditions.checkNotNull(password, "password");

Review Comment:
   The errorMessage are blurry here, can probably improve them to something like
   ```
   Id(Password) cannot be null if SASL is enabled.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to