[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15642713#comment-15642713
 ] 

ASF GitHub Bot commented on FLINK-3930:
---------------------------------------

Github user vijikarthi commented on the issue:

    https://github.com/apache/flink/pull/2425
  
    >
    The cookie is added to every single message/buffer that is transferred. 
That is too much - securing the integrity of the stream is responsibility of 
the encryption layer. The cookie should be added to requests messages that 
establish connections only.
    
    I will change the code to address cookie handling right after the SSL 
handshake using a new handler and drop the cookie passing for every messages. 
The handler will be added to the pipeline of both `NettyClient` and 
`NettyServer`. Client will send the cookie when the channel becomes active and 
the server will validate and keep track of the clients that are authorized. 
    
    Here is the pseudo-code for Client and Server handlers. Please take a look 
and let me know if you are okay with this approach and I will modify the code.
    
    ---
        public static class ClientCookieHandler extends 
ChannelInboundHandlerAdapter {
    
                private final String secureCookie;
    
                final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
    
                public ClientCookieHandler(String secureCookie) {
                        this.secureCookie = secureCookie;
                }
    
    
                @Override
                public void channelActive(ChannelHandlerContext ctx) throws 
Exception {
                        super.channelActive(ctx);
    
                        if(this.secureCookie != null && 
this.secureCookie.length() != 0) {
                                final ByteBuf buffer = Unpooled.buffer(4 + 
this.secureCookie.getBytes(DEFAULT_CHARSET).length);
                                
buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length);
                                
buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET));
                                ctx.writeAndFlush(buffer);
                        }
                }
        }
    
        public static class ServerCookieDecoder extends 
MessageToMessageDecoder<ByteBuf> {
    
                private final String secureCookie;
    
                private final List<Channel> channelList = new ArrayList<>();
    
                private final Charset DEFAULT_CHARSET = 
Charset.forName("utf-8");
    
               public ServerCookieDecoder(String secureCookie) {
                        this.secureCookie = secureCookie;
                }
    
                @Override
                protected void decode(ChannelHandlerContext ctx, ByteBuf msg, 
List<Object> out) throws Exception {
    
                        if(secureCookie == null || secureCookie.length() == 0) {
                                return;
                        }
    
                        if(channelList.contains(ctx.channel())) {
                                return;
                        }
    
                        //read cookie based on the cookie length passed
                        int cookieLength = msg.readInt();
                        if(cookieLength != 
secureCookie.getBytes(DEFAULT_CHARSET).length) {
                                String message = "Cookie length does not match 
with source cookie. Invalid secure cookie passed.";
                                throw new IllegalStateException(message);
                        }
    
                        //read only if cookie length is greater than zero
                        if(cookieLength > 0) {
    
                                final byte[] buffer = new 
byte[secureCookie.getBytes(DEFAULT_CHARSET).length];
                                msg.readBytes(buffer, 0, cookieLength);
    
                                
if(!Arrays.equals(secureCookie.getBytes(DEFAULT_CHARSET), buffer)) {
                                        LOG.error("Secure cookie from the 
client is not matching with the server's identity");
                                        throw new 
IllegalStateException("Invalid secure cookie passed.");
                                }
    
                                LOG.info("Secure cookie validation passed");
    
                                channelList.add(ctx.channel());
                        }
    
                }
        }
    --- 


> Implement Service-Level Authorization
> -------------------------------------
>
>                 Key: FLINK-3930
>                 URL: https://issues.apache.org/jira/browse/FLINK-3930
>             Project: Flink
>          Issue Type: New Feature
>          Components: Security
>            Reporter: Eron Wright 
>            Assignee: Vijay Srinivasaraghavan
>              Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to