mridulm commented on code in PR #2231:
URL:
https://github.com/apache/incubator-celeborn/pull/2231#discussion_r1454049159
##########
common/src/main/java/org/apache/celeborn/common/network/sasl/SaslClientBootstrap.java:
##########
@@ -84,8 +84,15 @@ public void doBootstrap(TransportClient client, Channel
channel) {
while (!saslClient.isComplete()) {
PbSaslRequest.Builder builder = PbSaslRequest.newBuilder();
if (firstToken) {
-
builder.setMethod(DIGEST_MD5).setAuthType(PbAuthType.CONNECTION_AUTH);
+ builder.setMethod(DIGEST_MD5);
}
+ // Setting the auth type to CONNECTION_AUTH for every message not just
the first one. This
+ // is
+ // because in Protobuf, for enums, the default value is the first
defined enum value which
+ // is the
+ // CLIENT_AUTH. It will be incorrect to set the auth type to
client_auth for every SASL
+ // message.
Review Comment:
We should have undefined or unknown as the first value in the enum - which
prevents these sort of issues.
(See
[here](https://protobuf.dev/programming-guides/dos-donts/#unspecified-enum)).
Looks like we missed this for the enum's we added - and that can be used to
avoid this problem.
##########
common/src/main/java/org/apache/celeborn/common/network/sasl/registration/RegistrationClientBootstrap.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.sasl.registration;
+
+import static org.apache.celeborn.common.network.sasl.SaslUtils.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.exception.CelebornException;
+import org.apache.celeborn.common.network.client.TransportClient;
+import org.apache.celeborn.common.network.client.TransportClientBootstrap;
+import org.apache.celeborn.common.network.protocol.TransportMessage;
+import org.apache.celeborn.common.network.sasl.CelebornSaslClient;
+import org.apache.celeborn.common.network.sasl.SaslClientBootstrap;
+import org.apache.celeborn.common.network.sasl.SaslCredentials;
+import org.apache.celeborn.common.network.sasl.SaslTimeoutException;
+import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.protocol.MessageType;
+import org.apache.celeborn.common.protocol.PbAuthType;
+import org.apache.celeborn.common.protocol.PbAuthenticationInitiationRequest;
+import org.apache.celeborn.common.protocol.PbAuthenticationInitiationResponse;
+import org.apache.celeborn.common.protocol.PbRegisterApplicationRequest;
+import org.apache.celeborn.common.protocol.PbRegisterApplicationResponse;
+import org.apache.celeborn.common.protocol.PbSaslMechanism;
+import org.apache.celeborn.common.protocol.PbSaslRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+/**
+ * Bootstraps a {@link TransportClient} by registering application (if the
application is not
+ * registered). If the application is already registered, it will bootstrap
the client by performing
+ * SASL authentication.
+ */
+public class RegistrationClientBootstrap implements TransportClientBootstrap {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RegistrationClientBootstrap.class);
+
+ private static final String VERSION = "1.0";
+
+ /**
+ * TODO: This should be made configurable. For now, we only support
ANONYMOUS for client-auth and
+ * DIGEST-MD5 for connect-auth.
+ */
+ private static final List<PbSaslMechanism> SASL_MECHANISMS =
+ Lists.newArrayList(
+ PbSaslMechanism.newBuilder()
+ .setMechanism(ANONYMOUS)
+ .addAuthTypes(PbAuthType.CLIENT_AUTH)
+ .build(),
+ PbSaslMechanism.newBuilder()
+ .setMechanism(DIGEST_MD5)
+ .addAuthTypes(PbAuthType.CONNECTION_AUTH)
+ .build());
+
+ private final TransportConf conf;
+ private final String appId;
+ private final SaslCredentials saslCredentials;
+
+ private final RegistrationInfo registrationInfo;
+
+ public RegistrationClientBootstrap(
+ TransportConf conf,
+ String appId,
+ SaslCredentials saslCredentials,
+ RegistrationInfo registrationInfo) {
+ this.conf = Preconditions.checkNotNull(conf, "conf");
+ this.appId = Preconditions.checkNotNull(appId, "appId");
+ this.saslCredentials = Preconditions.checkNotNull(saslCredentials,
"saslCredentials");
+ this.registrationInfo = Preconditions.checkNotNull(registrationInfo,
"registrationInfo");
+ }
+
+ @Override
+ public void doBootstrap(TransportClient client, Channel channel) throws
RuntimeException {
+ if (registrationInfo.getRegistrationState() ==
RegistrationInfo.RegistrationState.REGISTERED) {
+ LOG.info("client has already registered, skip register.");
+ doSaslBootstrap(client, channel);
+ return;
+ }
+ try {
+ LOG.info("authentication initiation started for {}", appId);
+ doAuthInitiation(client);
+ LOG.info("authentication initiation successful for {}", appId);
+ doClientAuthentication(client);
+ LOG.info("client authenticated for {}", appId);
+ register(client);
+ LOG.info("Registration for {}", appId);
+
registrationInfo.setRegistrationState(RegistrationInfo.RegistrationState.REGISTERED);
+ } catch (Throwable e) {
+ LOG.error("Registration failed for {}", appId, e);
+
registrationInfo.setRegistrationState(RegistrationInfo.RegistrationState.FAILED);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void doAuthInitiation(TransportClient client) throws IOException,
CelebornException {
+ PbAuthenticationInitiationRequest authInitRequest =
+ PbAuthenticationInitiationRequest.newBuilder()
+ .setVersion(VERSION)
+ .setAuthEnabled(true)
+ .addAllSaslMechanisms(SASL_MECHANISMS)
+ .build();
+ TransportMessage msg =
+ new TransportMessage(
+ MessageType.AUTHENTICATION_INITIATION_REQUEST,
authInitRequest.toByteArray());
+ ByteBuffer authInitResponseBuffer;
+ try {
+ authInitResponseBuffer = client.sendRpcSync(msg.toByteBuffer(),
conf.saslTimeoutMs());
+ } catch (RuntimeException ex) {
+ // TODO: Auth initiation timed out. Will just throw SaslTimeoutException
for now
+ if (ex.getCause() instanceof TimeoutException) {
+ throw new SaslTimeoutException(ex.getCause());
+ } else {
+ throw ex;
+ }
+ }
+ PbAuthenticationInitiationResponse authInitResponse =
+
TransportMessage.fromByteBuffer(authInitResponseBuffer).getParsedPayload();
+ if (!validateServerResponse(authInitResponse)) {
+ String exMsg =
+ "Registration failed due to incompatibility with the server."
+ + " InitRequest: "
+ + authInitRequest
+ + " InitResponse: "
+ + authInitResponse;
+ throw new CelebornException(exMsg);
+ }
+ // TODO: Client picks up negotiated sasl mechanisms
Review Comment:
Or rather, client validates required/supported mechanism is present.
##########
common/src/main/java/org/apache/celeborn/common/network/sasl/registration/RegistrationClientBootstrap.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.sasl.registration;
+
+import static org.apache.celeborn.common.network.sasl.SaslUtils.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.exception.CelebornException;
+import org.apache.celeborn.common.network.client.TransportClient;
+import org.apache.celeborn.common.network.client.TransportClientBootstrap;
+import org.apache.celeborn.common.network.protocol.TransportMessage;
+import org.apache.celeborn.common.network.sasl.CelebornSaslClient;
+import org.apache.celeborn.common.network.sasl.SaslClientBootstrap;
+import org.apache.celeborn.common.network.sasl.SaslCredentials;
+import org.apache.celeborn.common.network.sasl.SaslTimeoutException;
+import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.protocol.MessageType;
+import org.apache.celeborn.common.protocol.PbAuthType;
+import org.apache.celeborn.common.protocol.PbAuthenticationInitiationRequest;
+import org.apache.celeborn.common.protocol.PbAuthenticationInitiationResponse;
+import org.apache.celeborn.common.protocol.PbRegisterApplicationRequest;
+import org.apache.celeborn.common.protocol.PbRegisterApplicationResponse;
+import org.apache.celeborn.common.protocol.PbSaslMechanism;
+import org.apache.celeborn.common.protocol.PbSaslRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+/**
+ * Bootstraps a {@link TransportClient} by registering application (if the
application is not
+ * registered). If the application is already registered, it will bootstrap
the client by performing
+ * SASL authentication.
+ */
+public class RegistrationClientBootstrap implements TransportClientBootstrap {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RegistrationClientBootstrap.class);
+
+ private static final String VERSION = "1.0";
+
+ /**
+ * TODO: This should be made configurable. For now, we only support
ANONYMOUS for client-auth and
+ * DIGEST-MD5 for connect-auth.
+ */
+ private static final List<PbSaslMechanism> SASL_MECHANISMS =
+ Lists.newArrayList(
+ PbSaslMechanism.newBuilder()
+ .setMechanism(ANONYMOUS)
+ .addAuthTypes(PbAuthType.CLIENT_AUTH)
+ .build(),
+ PbSaslMechanism.newBuilder()
+ .setMechanism(DIGEST_MD5)
+ .addAuthTypes(PbAuthType.CONNECTION_AUTH)
+ .build());
+
+ private final TransportConf conf;
+ private final String appId;
+ private final SaslCredentials saslCredentials;
+
+ private final RegistrationInfo registrationInfo;
+
+ public RegistrationClientBootstrap(
+ TransportConf conf,
+ String appId,
+ SaslCredentials saslCredentials,
+ RegistrationInfo registrationInfo) {
+ this.conf = Preconditions.checkNotNull(conf, "conf");
+ this.appId = Preconditions.checkNotNull(appId, "appId");
+ this.saslCredentials = Preconditions.checkNotNull(saslCredentials,
"saslCredentials");
+ this.registrationInfo = Preconditions.checkNotNull(registrationInfo,
"registrationInfo");
+ }
+
+ @Override
+ public void doBootstrap(TransportClient client, Channel channel) throws
RuntimeException {
+ if (registrationInfo.getRegistrationState() ==
RegistrationInfo.RegistrationState.REGISTERED) {
+ LOG.info("client has already registered, skip register.");
+ doSaslBootstrap(client, channel);
+ return;
+ }
+ try {
+ LOG.info("authentication initiation started for {}", appId);
+ doAuthInitiation(client);
+ LOG.info("authentication initiation successful for {}", appId);
+ doClientAuthentication(client);
+ LOG.info("client authenticated for {}", appId);
+ register(client);
+ LOG.info("Registration for {}", appId);
+
registrationInfo.setRegistrationState(RegistrationInfo.RegistrationState.REGISTERED);
+ } catch (Throwable e) {
+ LOG.error("Registration failed for {}", appId, e);
+
registrationInfo.setRegistrationState(RegistrationInfo.RegistrationState.FAILED);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void doAuthInitiation(TransportClient client) throws IOException,
CelebornException {
+ PbAuthenticationInitiationRequest authInitRequest =
+ PbAuthenticationInitiationRequest.newBuilder()
+ .setVersion(VERSION)
+ .setAuthEnabled(true)
+ .addAllSaslMechanisms(SASL_MECHANISMS)
+ .build();
+ TransportMessage msg =
+ new TransportMessage(
+ MessageType.AUTHENTICATION_INITIATION_REQUEST,
authInitRequest.toByteArray());
+ ByteBuffer authInitResponseBuffer;
+ try {
+ authInitResponseBuffer = client.sendRpcSync(msg.toByteBuffer(),
conf.saslTimeoutMs());
+ } catch (RuntimeException ex) {
+ // TODO: Auth initiation timed out. Will just throw SaslTimeoutException
for now
Review Comment:
Throwing `SaslTimeoutException` is sufficient, right ? What is the change
the TODO is for ?
##########
common/src/main/java/org/apache/celeborn/common/network/sasl/registration/RegistrationClientBootstrap.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.sasl.registration;
+
+import static org.apache.celeborn.common.network.sasl.SaslUtils.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.exception.CelebornException;
+import org.apache.celeborn.common.network.client.TransportClient;
+import org.apache.celeborn.common.network.client.TransportClientBootstrap;
+import org.apache.celeborn.common.network.protocol.TransportMessage;
+import org.apache.celeborn.common.network.sasl.CelebornSaslClient;
+import org.apache.celeborn.common.network.sasl.SaslClientBootstrap;
+import org.apache.celeborn.common.network.sasl.SaslCredentials;
+import org.apache.celeborn.common.network.sasl.SaslTimeoutException;
+import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.protocol.MessageType;
+import org.apache.celeborn.common.protocol.PbAuthType;
+import org.apache.celeborn.common.protocol.PbAuthenticationInitiationRequest;
+import org.apache.celeborn.common.protocol.PbAuthenticationInitiationResponse;
+import org.apache.celeborn.common.protocol.PbRegisterApplicationRequest;
+import org.apache.celeborn.common.protocol.PbRegisterApplicationResponse;
+import org.apache.celeborn.common.protocol.PbSaslMechanism;
+import org.apache.celeborn.common.protocol.PbSaslRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+/**
+ * Bootstraps a {@link TransportClient} by registering application (if the
application is not
+ * registered). If the application is already registered, it will bootstrap
the client by performing
+ * SASL authentication.
+ */
+public class RegistrationClientBootstrap implements TransportClientBootstrap {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RegistrationClientBootstrap.class);
+
+ private static final String VERSION = "1.0";
+
+ /**
+ * TODO: This should be made configurable. For now, we only support
ANONYMOUS for client-auth and
+ * DIGEST-MD5 for connect-auth.
+ */
+ private static final List<PbSaslMechanism> SASL_MECHANISMS =
+ Lists.newArrayList(
+ PbSaslMechanism.newBuilder()
+ .setMechanism(ANONYMOUS)
+ .addAuthTypes(PbAuthType.CLIENT_AUTH)
+ .build(),
+ PbSaslMechanism.newBuilder()
+ .setMechanism(DIGEST_MD5)
+ .addAuthTypes(PbAuthType.CONNECTION_AUTH)
+ .build());
+
+ private final TransportConf conf;
+ private final String appId;
+ private final SaslCredentials saslCredentials;
+
+ private final RegistrationInfo registrationInfo;
+
+ public RegistrationClientBootstrap(
+ TransportConf conf,
+ String appId,
+ SaslCredentials saslCredentials,
+ RegistrationInfo registrationInfo) {
+ this.conf = Preconditions.checkNotNull(conf, "conf");
+ this.appId = Preconditions.checkNotNull(appId, "appId");
+ this.saslCredentials = Preconditions.checkNotNull(saslCredentials,
"saslCredentials");
+ this.registrationInfo = Preconditions.checkNotNull(registrationInfo,
"registrationInfo");
+ }
+
+ @Override
+ public void doBootstrap(TransportClient client, Channel channel) throws
RuntimeException {
+ if (registrationInfo.getRegistrationState() ==
RegistrationInfo.RegistrationState.REGISTERED) {
+ LOG.info("client has already registered, skip register.");
+ doSaslBootstrap(client, channel);
+ return;
+ }
+ try {
+ LOG.info("authentication initiation started for {}", appId);
+ doAuthInitiation(client);
+ LOG.info("authentication initiation successful for {}", appId);
+ doClientAuthentication(client);
+ LOG.info("client authenticated for {}", appId);
+ register(client);
+ LOG.info("Registration for {}", appId);
+
registrationInfo.setRegistrationState(RegistrationInfo.RegistrationState.REGISTERED);
+ } catch (Throwable e) {
Review Comment:
Let us not catch `Throwable` and rethrow as RuntimeException here - but
handle the specific known subset we need.
Instead, handle it in a finally to mark the registeration state as failed.
Something like:
```
boolean failed = true;
try {
<try auth>
registrationInfo.setRegistrationState(RegistrationInfo.RegistrationState.REGISTERED);
failed = false;
} catch (IOException | CelebornException ex) {
<handle ...>
}
<other catch clauses>
finally (
if (failed) {
registrationInfo.setRegistrationState(RegistrationInfo.RegistrationState.FAILED);
}
}
```
##########
common/src/main/java/org/apache/celeborn/common/network/sasl/registration/RegistrationRpcHandler.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.sasl.registration;
+
+import static org.apache.celeborn.common.network.sasl.SaslUtils.*;
+import static org.apache.celeborn.common.protocol.MessageType.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.network.client.RpcResponseCallback;
+import org.apache.celeborn.common.network.client.TransportClient;
+import org.apache.celeborn.common.network.protocol.RequestMessage;
+import org.apache.celeborn.common.network.protocol.RpcFailure;
+import org.apache.celeborn.common.network.protocol.RpcRequest;
+import org.apache.celeborn.common.network.protocol.TransportMessage;
+import org.apache.celeborn.common.network.sasl.CelebornSaslServer;
+import org.apache.celeborn.common.network.sasl.SaslRpcHandler;
+import org.apache.celeborn.common.network.sasl.SecretRegistry;
+import org.apache.celeborn.common.network.server.BaseMessageHandler;
+import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.protocol.PbAuthType;
+import org.apache.celeborn.common.protocol.PbAuthenticationInitiationRequest;
+import org.apache.celeborn.common.protocol.PbAuthenticationInitiationResponse;
+import org.apache.celeborn.common.protocol.PbRegisterApplicationRequest;
+import org.apache.celeborn.common.protocol.PbRegisterApplicationResponse;
+import org.apache.celeborn.common.protocol.PbSaslMechanism;
+import org.apache.celeborn.common.protocol.PbSaslRequest;
+
+/**
+ * RPC Handler which registers an application. If an application is registered
or the connection is
+ * authenticated, subsequent messages are delegated to a child RPC handler.
+ */
+public class RegistrationRpcHandler extends BaseMessageHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(RegistrationRpcHandler.class);
+
+ private static final String VERSION = "1.0";
+
+ /**
+ * TODO: This should be made configurable. For now, we only support
ANONYMOUS for client-auth and
+ * DIGEST-MD5 for connect-auth.
+ */
+ private static final List<PbSaslMechanism> SASL_MECHANISMS =
+ Lists.newArrayList(
+ PbSaslMechanism.newBuilder()
+ .setMechanism(ANONYMOUS)
+ .addAuthTypes(PbAuthType.CLIENT_AUTH)
+ .build(),
+ PbSaslMechanism.newBuilder()
+ .setMechanism(DIGEST_MD5)
+ .addAuthTypes(PbAuthType.CONNECTION_AUTH)
+ .build());
+
+ /** Transport configuration. */
+ private final TransportConf conf;
+
+ /** The client channel. */
+ private final Channel channel;
+
+ private final BaseMessageHandler delegate;
+
+ private RegistrationState registrationState = RegistrationState.NONE;
+
+ /** Class which provides secret keys which are shared by server and client
on a per-app basis. */
+ private final SecretRegistry secretRegistry;
+
+ private SaslRpcHandler saslHandler;
+
+ /** Used for client authentication. */
+ private CelebornSaslServer saslServer = null;
+
+ public RegistrationRpcHandler(
+ TransportConf conf,
+ Channel channel,
+ BaseMessageHandler delegate,
+ SecretRegistry secretRegistry) {
+ this.conf = conf;
+ this.channel = channel;
+ this.secretRegistry = secretRegistry;
+ this.delegate = delegate;
+ this.saslHandler = new SaslRpcHandler(conf, channel, delegate,
secretRegistry);
+ }
+
+ // TODO: This has to check the delegate
+ @Override
+ public boolean checkRegistered() {
+ return delegate.checkRegistered();
+ }
+
+ @Override
+ public final void receive(
+ TransportClient client, RequestMessage message, RpcResponseCallback
callback) {
+ // The message is delegated either if the client is already authenticated
or if the connection
+ // is authenticated.
+ if (registrationState == RegistrationState.REGISTERED ||
saslHandler.isAuthenticated()) {
+ LOG.trace("Already authenticated. Delegating {}", client.getClientId());
Review Comment:
super nit: wrap with `LOG.isTraceEnabled` ?
##########
common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java:
##########
@@ -33,10 +33,12 @@ public static SecretRegistryImpl getInstance() {
private final ConcurrentHashMap<String, String> secrets = new
ConcurrentHashMap<>();
+ @Override
public void register(String appId, String secret) {
secrets.put(appId, secret);
Review Comment:
put only if absent - and throw exception if already present: in case of race
conditions.
##########
common/src/main/java/org/apache/celeborn/common/network/sasl/registration/RegistrationClientBootstrap.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.sasl.registration;
+
+import static org.apache.celeborn.common.network.sasl.SaslUtils.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.exception.CelebornException;
+import org.apache.celeborn.common.network.client.TransportClient;
+import org.apache.celeborn.common.network.client.TransportClientBootstrap;
+import org.apache.celeborn.common.network.protocol.TransportMessage;
+import org.apache.celeborn.common.network.sasl.CelebornSaslClient;
+import org.apache.celeborn.common.network.sasl.SaslClientBootstrap;
+import org.apache.celeborn.common.network.sasl.SaslCredentials;
+import org.apache.celeborn.common.network.sasl.SaslTimeoutException;
+import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.protocol.MessageType;
+import org.apache.celeborn.common.protocol.PbAuthType;
+import org.apache.celeborn.common.protocol.PbAuthenticationInitiationRequest;
+import org.apache.celeborn.common.protocol.PbAuthenticationInitiationResponse;
+import org.apache.celeborn.common.protocol.PbRegisterApplicationRequest;
+import org.apache.celeborn.common.protocol.PbRegisterApplicationResponse;
+import org.apache.celeborn.common.protocol.PbSaslMechanism;
+import org.apache.celeborn.common.protocol.PbSaslRequest;
+import org.apache.celeborn.common.util.JavaUtils;
+
+/**
+ * Bootstraps a {@link TransportClient} by registering application (if the
application is not
+ * registered). If the application is already registered, it will bootstrap
the client by performing
+ * SASL authentication.
+ */
+public class RegistrationClientBootstrap implements TransportClientBootstrap {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RegistrationClientBootstrap.class);
+
+ private static final String VERSION = "1.0";
+
+ /**
+ * TODO: This should be made configurable. For now, we only support
ANONYMOUS for client-auth and
+ * DIGEST-MD5 for connect-auth.
+ */
+ private static final List<PbSaslMechanism> SASL_MECHANISMS =
+ Lists.newArrayList(
+ PbSaslMechanism.newBuilder()
+ .setMechanism(ANONYMOUS)
+ .addAuthTypes(PbAuthType.CLIENT_AUTH)
+ .build(),
+ PbSaslMechanism.newBuilder()
+ .setMechanism(DIGEST_MD5)
+ .addAuthTypes(PbAuthType.CONNECTION_AUTH)
+ .build());
+
+ private final TransportConf conf;
+ private final String appId;
+ private final SaslCredentials saslCredentials;
+
+ private final RegistrationInfo registrationInfo;
+
+ public RegistrationClientBootstrap(
+ TransportConf conf,
+ String appId,
+ SaslCredentials saslCredentials,
+ RegistrationInfo registrationInfo) {
+ this.conf = Preconditions.checkNotNull(conf, "conf");
+ this.appId = Preconditions.checkNotNull(appId, "appId");
+ this.saslCredentials = Preconditions.checkNotNull(saslCredentials,
"saslCredentials");
+ this.registrationInfo = Preconditions.checkNotNull(registrationInfo,
"registrationInfo");
+ }
+
+ @Override
+ public void doBootstrap(TransportClient client, Channel channel) throws
RuntimeException {
+ if (registrationInfo.getRegistrationState() ==
RegistrationInfo.RegistrationState.REGISTERED) {
Review Comment:
To clarify, we are allowing for repeated re-registeration if the previous
requests failed; is this expected behavior we are shooting for ? Or will a
failure result in application termination ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]