[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15927451#comment-15927451 ]
ASF GitHub Bot commented on FLINK-3930: --------------------------------------- Github user WangTaoTheTonic commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r106335560 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CookieHandler.java --- @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.MessageToMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class CookieHandler { + + public static class ClientCookieHandler extends ChannelInboundHandlerAdapter { + + private final Logger LOG = LoggerFactory.getLogger(ClientCookieHandler.class); + + private final String secureCookie; + + final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); + + public ClientCookieHandler(String secureCookie) { + this.secureCookie = secureCookie; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + LOG.debug("In channelActive method of ClientCookieHandler"); + + if(this.secureCookie != null && this.secureCookie.length() != 0) { + LOG.debug("In channelActive method of ClientCookieHandler -> sending secure cookie"); + final ByteBuf buffer = Unpooled.buffer(4 + this.secureCookie.getBytes(DEFAULT_CHARSET).length); + buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length); + buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET)); + ctx.writeAndFlush(buffer); + } + } + } + + public static class ServerCookieDecoder extends MessageToMessageDecoder<ByteBuf> { + + private final String secureCookie; + + private final List<Channel> channelList = new ArrayList<>(); + + private final Charset DEFAULT_CHARSET = Charset.forName("utf-8"); + + private final Logger LOG = LoggerFactory.getLogger(ServerCookieDecoder.class); + + public ServerCookieDecoder(String secureCookie) { + this.secureCookie = secureCookie; + } + + /** + * Decode from one message to an other. This method will be called for each written message that can be handled + * by this encoder. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder} belongs to + * @param msg the message to decode to an other one + * @param out the {@link List} to which decoded messages should be added + * @throws Exception is thrown if an error accour + */ + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { + + LOG.debug("ChannelHandlerContext name: {}, channel: {}", ctx.name(), ctx.channel()); + + if(secureCookie == null || secureCookie.length() == 0) { + LOG.debug("Not validating secure cookie since the server configuration is not enabled to use cookie"); + return; + } + + LOG.debug("Going to decode the secure cookie passed by the remote client"); + + if(channelList.contains(ctx.channel())) { + LOG.debug("Channel: {} already authorized", ctx.channel()); + return; + } + + //read cookie based on the cookie length passed + int cookieLength = msg.readInt(); + if(cookieLength != secureCookie.getBytes(DEFAULT_CHARSET).length) { + ctx.channel().close(); + String message = "Cookie length does not match with source cookie. Invalid secure cookie passed."; + throw new IllegalStateException(message); + } + + //read only if cookie length is greater than zero + if(cookieLength > 0) { + + final byte[] buffer = new byte[secureCookie.getBytes(DEFAULT_CHARSET).length]; + msg.readBytes(buffer, 0, cookieLength); + + if(!Arrays.equals(secureCookie.getBytes(DEFAULT_CHARSET), buffer)) { + LOG.error("Secure cookie from the client is not matching with the server's identity"); + throw new IllegalStateException("Invalid secure cookie passed."); + } + + LOG.info("Secure cookie validation passed"); + + channelList.add(ctx.channel()); --- End diff -- It seems like we only have channels added, but not removed. Would this list increase indefinitely? > Implement Service-Level Authorization > ------------------------------------- > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security > Reporter: Eron Wright > Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.15#6346)