Github user vijikarthi commented on the issue:
https://github.com/apache/flink/pull/2425
>
The cookie is added to every single message/buffer that is transferred.
That is too much - securing the integrity of the stream is responsibility of
the encryption layer. The cookie should be added to requests messages that
establish connections only.
I will change the code to address cookie handling right after the SSL
handshake using a new handler and drop the cookie passing for every messages.
The handler will be added to the pipeline of both `NettyClient` and
`NettyServer`. Client will send the cookie when the channel becomes active and
the server will validate and keep track of the clients that are authorized.
Here is the pseudo-code for Client and Server handlers. Please take a look
and let me know if you are okay with this approach and I will modify the code.
---
public static class ClientCookieHandler extends
ChannelInboundHandlerAdapter {
private final String secureCookie;
final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
public ClientCookieHandler(String secureCookie) {
this.secureCookie = secureCookie;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws
Exception {
super.channelActive(ctx);
if(this.secureCookie != null &&
this.secureCookie.length() != 0) {
final ByteBuf buffer = Unpooled.buffer(4 +
this.secureCookie.getBytes(DEFAULT_CHARSET).length);
buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length);
buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET));
ctx.writeAndFlush(buffer);
}
}
}
public static class ServerCookieDecoder extends
MessageToMessageDecoder<ByteBuf> {
private final String secureCookie;
private final List<Channel> channelList = new ArrayList<>();
private final Charset DEFAULT_CHARSET =
Charset.forName("utf-8");
public ServerCookieDecoder(String secureCookie) {
this.secureCookie = secureCookie;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
List<Object> out) throws Exception {
if(secureCookie == null || secureCookie.length() == 0) {
return;
}
if(channelList.contains(ctx.channel())) {
return;
}
//read cookie based on the cookie length passed
int cookieLength = msg.readInt();
if(cookieLength !=
secureCookie.getBytes(DEFAULT_CHARSET).length) {
String message = "Cookie length does not match
with source cookie. Invalid secure cookie passed.";
throw new IllegalStateException(message);
}
//read only if cookie length is greater than zero
if(cookieLength > 0) {
final byte[] buffer = new
byte[secureCookie.getBytes(DEFAULT_CHARSET).length];
msg.readBytes(buffer, 0, cookieLength);
if(!Arrays.equals(secureCookie.getBytes(DEFAULT_CHARSET), buffer)) {
LOG.error("Secure cookie from the
client is not matching with the server's identity");
throw new
IllegalStateException("Invalid secure cookie passed.");
}
LOG.info("Secure cookie validation passed");
channelList.add(ctx.channel());
}
}
}
---
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---