http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java deleted file mode 100644 index 18a88da..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.queryablestate.network.messages.MessageBody; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.queryablestate.network.messages.MessageType; -import org.apache.flink.runtime.query.netty.KvStateRequestStats; -import org.apache.flink.runtime.state.internal.InternalKvState; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * The base class of every handler used by an {@link AbstractServerBase}. - * - * @param <REQ> the type of request the server expects to receive. - * @param <RESP> the type of response the server will send. - */ -@Internal [email protected] -public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractServerHandler.class); - - /** The owning server of this handler. */ - private final AbstractServerBase<REQ, RESP> server; - - /** The serializer used to (de-)serialize messages. */ - private final MessageSerializer<REQ, RESP> serializer; - - /** Thread pool for query execution. */ - protected final ExecutorService queryExecutor; - - /** Exposed server statistics. */ - private final KvStateRequestStats stats; - - /** - * Create the handler. - * - * @param serializer the serializer used to (de-)serialize messages - * @param stats statistics collector - */ - public AbstractServerHandler( - final AbstractServerBase<REQ, RESP> server, - final MessageSerializer<REQ, RESP> serializer, - final KvStateRequestStats stats) { - - this.server = Preconditions.checkNotNull(server); - this.serializer = Preconditions.checkNotNull(serializer); - this.queryExecutor = server.getQueryExecutor(); - this.stats = Preconditions.checkNotNull(stats); - } - - protected String getServerName() { - return server.getServerName(); - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - stats.reportActiveConnection(); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - stats.reportInactiveConnection(); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - REQ request = null; - long requestId = -1L; - - try { - final ByteBuf buf = (ByteBuf) msg; - final MessageType msgType = MessageSerializer.deserializeHeader(buf); - - requestId = MessageSerializer.getRequestId(buf); - - if (msgType == MessageType.REQUEST) { - - // ------------------------------------------------------------ - // MessageBody - // ------------------------------------------------------------ - request = serializer.deserializeRequest(buf); - stats.reportRequest(); - - // Execute actual query async, because it is possibly - // blocking (e.g. file I/O). - // - // A submission failure is not treated as fatal. - queryExecutor.submit(new AsyncRequestTask<>(this, ctx, requestId, request, stats)); - - } else { - // ------------------------------------------------------------ - // Unexpected - // ------------------------------------------------------------ - - final String errMsg = "Unexpected message type " + msgType + ". Expected " + MessageType.REQUEST + "."; - final ByteBuf failure = MessageSerializer.serializeServerFailure(ctx.alloc(), new IllegalArgumentException(errMsg)); - - LOG.debug(errMsg); - ctx.writeAndFlush(failure); - } - } catch (Throwable t) { - final String stringifiedCause = ExceptionUtils.stringifyException(t); - - String errMsg; - ByteBuf err; - if (request != null) { - errMsg = "Failed request with ID " + requestId + ". Caused by: " + stringifiedCause; - err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg)); - stats.reportFailedRequest(); - } else { - errMsg = "Failed incoming message. Caused by: " + stringifiedCause; - err = MessageSerializer.serializeServerFailure(ctx.alloc(), new RuntimeException(errMsg)); - } - - LOG.debug(errMsg); - ctx.writeAndFlush(err); - - } finally { - // IMPORTANT: We have to always recycle the incoming buffer. - // Otherwise we will leak memory out of Netty's buffer pool. - // - // If any operation ever holds on to the buffer, it is the - // responsibility of that operation to retain the buffer and - // release it later. - ReferenceCountUtil.release(msg); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - final String msg = "Exception in server pipeline. Caused by: " + ExceptionUtils.stringifyException(cause); - final ByteBuf err = serializer.serializeServerFailure(ctx.alloc(), new RuntimeException(msg)); - - LOG.debug(msg); - ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE); - } - - /** - * Handles an incoming request and returns a {@link CompletableFuture} containing the corresponding response. - * - * <p><b>NOTE:</b> This method is called by multiple threads. - * - * @param requestId the id of the received request to be handled. - * @param request the request to be handled. - * @return A future with the response to be forwarded to the client. - */ - public abstract CompletableFuture<RESP> handleRequest(final long requestId, final REQ request); - - /** - * Shuts down any handler specific resources, e.g. thread pools etc. - */ - public abstract void shutdown(); - - /** - * Task to execute the actual query against the {@link InternalKvState} instance. - */ - private static class AsyncRequestTask<REQ extends MessageBody, RESP extends MessageBody> implements Runnable { - - private final AbstractServerHandler<REQ, RESP> handler; - - private final ChannelHandlerContext ctx; - - private final long requestId; - - private final REQ request; - - private final KvStateRequestStats stats; - - private final long creationNanos; - - AsyncRequestTask( - final AbstractServerHandler<REQ, RESP> handler, - final ChannelHandlerContext ctx, - final long requestId, - final REQ request, - final KvStateRequestStats stats) { - - this.handler = Preconditions.checkNotNull(handler); - this.ctx = Preconditions.checkNotNull(ctx); - this.requestId = requestId; - this.request = Preconditions.checkNotNull(request); - this.stats = Preconditions.checkNotNull(stats); - this.creationNanos = System.nanoTime(); - } - - @Override - public void run() { - - if (!ctx.channel().isActive()) { - return; - } - - handler.handleRequest(requestId, request).whenComplete((resp, throwable) -> { - try { - if (throwable != null) { - throw throwable instanceof CompletionException - ? throwable.getCause() - : throwable; - } - - if (resp == null) { - throw new BadRequestException(handler.getServerName(), "NULL returned for request with ID " + requestId + "."); - } - - final ByteBuf serialResp = MessageSerializer.serializeResponse(ctx.alloc(), requestId, resp); - - int highWatermark = ctx.channel().config().getWriteBufferHighWaterMark(); - - ChannelFuture write; - if (serialResp.readableBytes() <= highWatermark) { - write = ctx.writeAndFlush(serialResp); - } else { - write = ctx.writeAndFlush(new ChunkedByteBuf(serialResp, highWatermark)); - } - write.addListener(new RequestWriteListener()); - - } catch (BadRequestException e) { - try { - stats.reportFailedRequest(); - final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, e); - ctx.writeAndFlush(err); - } catch (IOException io) { - LOG.error("Failed to respond with the error after failed request", io); - } - } catch (Throwable t) { - try { - stats.reportFailedRequest(); - - final String errMsg = "Failed request " + requestId + ". Caused by: " + ExceptionUtils.stringifyException(t); - final ByteBuf err = MessageSerializer.serializeRequestFailure(ctx.alloc(), requestId, new RuntimeException(errMsg)); - ctx.writeAndFlush(err); - } catch (IOException io) { - LOG.error("Failed to respond with the error after failed request", io); - } - } - }); - } - - @Override - public String toString() { - return "AsyncRequestTask{" + - "requestId=" + requestId + - ", request=" + request + - '}'; - } - - /** - * Callback after query result has been written. - * - * <p>Gathers stats and logs errors. - */ - private class RequestWriteListener implements ChannelFutureListener { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - long durationNanos = System.nanoTime() - creationNanos; - long durationMillis = TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS); - - if (future.isSuccess()) { - LOG.debug("Request {} was successfully answered after {} ms.", request, durationMillis); - stats.reportSuccessfulRequest(durationMillis); - } else { - LOG.debug("Request {} failed after {} ms : ", request, durationMillis, future.cause()); - stats.reportFailedRequest(); - } - } - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java deleted file mode 100644 index 3c0c484..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/BadRequestException.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.util.Preconditions; - -/** - * Base class for exceptions thrown during querying Flink's managed state. - */ -@Internal -public class BadRequestException extends Exception { - - private static final long serialVersionUID = 3458743952407632903L; - - public BadRequestException(String serverName, String message) { - super(Preconditions.checkNotNull(serverName) + " : " + message); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java deleted file mode 100644 index 9c56025..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput; -import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; - -/** - * A {@link ByteBuf} instance to be consumed in chunks by {@link ChunkedWriteHandler}, - * respecting the high and low watermarks. - * - * @see <a href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0">Low/High Watermarks</a> - */ -@Internal -public class ChunkedByteBuf implements ChunkedInput<ByteBuf> { - - /** The buffer to chunk. */ - private final ByteBuf buf; - - /** Size of chunks. */ - private final int chunkSize; - - /** Closed flag. */ - private boolean isClosed; - - /** End of input flag. */ - private boolean isEndOfInput; - - public ChunkedByteBuf(ByteBuf buf, int chunkSize) { - this.buf = Preconditions.checkNotNull(buf, "Buffer"); - Preconditions.checkArgument(chunkSize > 0, "Non-positive chunk size"); - this.chunkSize = chunkSize; - } - - @Override - public boolean isEndOfInput() throws Exception { - return isClosed || isEndOfInput; - } - - @Override - public void close() throws Exception { - if (!isClosed) { - // If we did not consume the whole buffer yet, we have to release - // it here. Otherwise, it's the responsibility of the consumer. - if (!isEndOfInput) { - buf.release(); - } - - isClosed = true; - } - } - - @Override - public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { - if (isClosed) { - return null; - } else if (buf.readableBytes() <= chunkSize) { - isEndOfInput = true; - - // Don't retain as the consumer is responsible to release it - return buf.slice(); - } else { - // Return a chunk sized slice of the buffer. The ref count is - // shared with the original buffer. That's why we need to retain - // a reference here. - return buf.readSlice(chunkSize).retain(); - } - } - - @Override - public String toString() { - return "ChunkedByteBuf{" + - "buf=" + buf + - ", chunkSize=" + chunkSize + - ", isClosed=" + isClosed + - ", isEndOfInput=" + isEndOfInput + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java deleted file mode 100644 index e6d59de..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java +++ /dev/null @@ -1,537 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.queryablestate.network.messages.MessageBody; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.io.network.netty.NettyBufferPool; -import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.query.netty.KvStateRequestStats; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; -import org.apache.flink.shaded.netty4.io.netty.channel.Channel; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; -import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; - -import java.nio.channels.ClosedChannelException; -import java.util.ArrayDeque; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -/** - * The base class for every client in the queryable state module. - * It is using pure netty to send and receive messages of type {@link MessageBody}. - * - * @param <REQ> the type of request the client will send. - * @param <RESP> the type of response the client expects to receive. - */ -@Internal -public class Client<REQ extends MessageBody, RESP extends MessageBody> { - - /** The name of the client. Used for logging and stack traces.*/ - private final String clientName; - - /** Netty's Bootstrap. */ - private final Bootstrap bootstrap; - - /** The serializer to be used for (de-)serializing messages. */ - private final MessageSerializer<REQ, RESP> messageSerializer; - - /** Statistics tracker. */ - private final KvStateRequestStats stats; - - /** Established connections. */ - private final Map<KvStateServerAddress, EstablishedConnection> establishedConnections = new ConcurrentHashMap<>(); - - /** Pending connections. */ - private final Map<KvStateServerAddress, PendingConnection> pendingConnections = new ConcurrentHashMap<>(); - - /** Atomic shut down flag. */ - private final AtomicBoolean shutDown = new AtomicBoolean(); - - /** - * Creates a client with the specified number of event loop threads. - * - * @param clientName the name of the client. - * @param numEventLoopThreads number of event loop threads (minimum 1). - * @param serializer the serializer used to (de-)serialize messages. - * @param stats the statistics collector. - */ - public Client( - final String clientName, - final int numEventLoopThreads, - final MessageSerializer<REQ, RESP> serializer, - final KvStateRequestStats stats) { - - Preconditions.checkArgument(numEventLoopThreads >= 1, - "Non-positive number of event loop threads."); - - this.clientName = Preconditions.checkNotNull(clientName); - this.messageSerializer = Preconditions.checkNotNull(serializer); - this.stats = Preconditions.checkNotNull(stats); - - final ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Flink " + clientName + " Event Loop Thread %d") - .build(); - - final EventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory); - final ByteBufAllocator bufferPool = new NettyBufferPool(numEventLoopThreads); - - this.bootstrap = new Bootstrap() - .group(nioGroup) - .channel(NioSocketChannel.class) - .option(ChannelOption.ALLOCATOR, bufferPool) - .handler(new ChannelInitializer<SocketChannel>() { - @Override - protected void initChannel(SocketChannel channel) throws Exception { - channel.pipeline() - .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) - .addLast(new ChunkedWriteHandler()); - } - }); - } - - public String getClientName() { - return clientName; - } - - public CompletableFuture<RESP> sendRequest(final KvStateServerAddress serverAddress, final REQ request) { - if (shutDown.get()) { - return FutureUtils.getFailedFuture(new IllegalStateException("Shut down")); - } - - EstablishedConnection connection = establishedConnections.get(serverAddress); - if (connection != null) { - return connection.sendRequest(request); - } else { - PendingConnection pendingConnection = pendingConnections.get(serverAddress); - if (pendingConnection != null) { - // There was a race, use the existing pending connection. - return pendingConnection.sendRequest(request); - } else { - // We try to connect to the server. - PendingConnection pending = new PendingConnection(serverAddress, messageSerializer); - PendingConnection previous = pendingConnections.putIfAbsent(serverAddress, pending); - - if (previous == null) { - // OK, we are responsible to connect. - bootstrap.connect(serverAddress.getHost(), serverAddress.getPort()).addListener(pending); - return pending.sendRequest(request); - } else { - // There was a race, use the existing pending connection. - return previous.sendRequest(request); - } - } - } - } - - /** - * Shuts down the client and closes all connections. - * - * <p>After a call to this method, all returned futures will be failed. - */ - public void shutdown() { - if (shutDown.compareAndSet(false, true)) { - for (Map.Entry<KvStateServerAddress, EstablishedConnection> conn : establishedConnections.entrySet()) { - if (establishedConnections.remove(conn.getKey(), conn.getValue())) { - conn.getValue().close(); - } - } - - for (Map.Entry<KvStateServerAddress, PendingConnection> conn : pendingConnections.entrySet()) { - if (pendingConnections.remove(conn.getKey()) != null) { - conn.getValue().close(); - } - } - - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); - } - } - } - } - - /** - * A pending connection that is in the process of connecting. - */ - private class PendingConnection implements ChannelFutureListener { - - /** Lock to guard the connect call, channel hand in, etc. */ - private final Object connectLock = new Object(); - - /** Address of the server we are connecting to. */ - private final KvStateServerAddress serverAddress; - - private final MessageSerializer<REQ, RESP> serializer; - - /** Queue of requests while connecting. */ - private final ArrayDeque<PendingRequest> queuedRequests = new ArrayDeque<>(); - - /** The established connection after the connect succeeds. */ - private EstablishedConnection established; - - /** Closed flag. */ - private boolean closed; - - /** Failure cause if something goes wrong. */ - private Throwable failureCause; - - /** - * Creates a pending connection to the given server. - * - * @param serverAddress Address of the server to connect to. - */ - private PendingConnection( - final KvStateServerAddress serverAddress, - final MessageSerializer<REQ, RESP> serializer) { - this.serverAddress = serverAddress; - this.serializer = serializer; - } - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - handInChannel(future.channel()); - } else { - close(future.cause()); - } - } - - /** - * Returns a future holding the serialized request result. - * - * <p>If the channel has been established, forward the call to the - * established channel, otherwise queue it for when the channel is - * handed in. - * - * @param request the request to be sent. - * @return Future holding the serialized result - */ - public CompletableFuture<RESP> sendRequest(REQ request) { - synchronized (connectLock) { - if (failureCause != null) { - return FutureUtils.getFailedFuture(failureCause); - } else if (closed) { - return FutureUtils.getFailedFuture(new ClosedChannelException()); - } else { - if (established != null) { - return established.sendRequest(request); - } else { - // Queue this and handle when connected - final PendingRequest pending = new PendingRequest(request); - queuedRequests.add(pending); - return pending; - } - } - } - } - - /** - * Hands in a channel after a successful connection. - * - * @param channel Channel to hand in - */ - private void handInChannel(Channel channel) { - synchronized (connectLock) { - if (closed || failureCause != null) { - // Close the channel and we are done. Any queued requests - // are removed on the close/failure call and after that no - // new ones can be enqueued. - channel.close(); - } else { - established = new EstablishedConnection(serverAddress, serializer, channel); - - while (!queuedRequests.isEmpty()) { - final PendingRequest pending = queuedRequests.poll(); - - established.sendRequest(pending.request) - .thenAccept(resp -> pending.complete(resp)) - .exceptionally(throwable -> { - pending.completeExceptionally(throwable); - return null; - }); - } - - // Publish the channel for the general public - establishedConnections.put(serverAddress, established); - pendingConnections.remove(serverAddress); - - // Check shut down for possible race with shut down. We - // don't want any lingering connections after shut down, - // which can happen if we don't check this here. - if (shutDown.get()) { - if (establishedConnections.remove(serverAddress, established)) { - established.close(); - } - } - } - } - } - - /** - * Close the connecting channel with a ClosedChannelException. - */ - private void close() { - close(new ClosedChannelException()); - } - - /** - * Close the connecting channel with an Exception (can be {@code null}) - * or forward to the established channel. - */ - private void close(Throwable cause) { - synchronized (connectLock) { - if (!closed) { - if (failureCause == null) { - failureCause = cause; - } - - if (established != null) { - established.close(); - } else { - PendingRequest pending; - while ((pending = queuedRequests.poll()) != null) { - pending.completeExceptionally(cause); - } - } - closed = true; - } - } - } - - @Override - public String toString() { - synchronized (connectLock) { - return "PendingConnection{" + - "serverAddress=" + serverAddress + - ", queuedRequests=" + queuedRequests.size() + - ", established=" + (established != null) + - ", closed=" + closed + - '}'; - } - } - - /** - * A pending request queued while the channel is connecting. - */ - private final class PendingRequest extends CompletableFuture<RESP> { - - private final REQ request; - - private PendingRequest(REQ request) { - this.request = request; - } - } - } - - /** - * An established connection that wraps the actual channel instance and is - * registered at the {@link ClientHandler} for callbacks. - */ - private class EstablishedConnection implements ClientHandlerCallback<RESP> { - - /** Address of the server we are connected to. */ - private final KvStateServerAddress serverAddress; - - /** The actual TCP channel. */ - private final Channel channel; - - /** Pending requests keyed by request ID. */ - private final ConcurrentHashMap<Long, TimestampedCompletableFuture> pendingRequests = new ConcurrentHashMap<>(); - - /** Current request number used to assign unique request IDs. */ - private final AtomicLong requestCount = new AtomicLong(); - - /** Reference to a failure that was reported by the channel. */ - private final AtomicReference<Throwable> failureCause = new AtomicReference<>(); - - /** - * Creates an established connection with the given channel. - * - * @param serverAddress Address of the server connected to - * @param channel The actual TCP channel - */ - EstablishedConnection( - final KvStateServerAddress serverAddress, - final MessageSerializer<REQ, RESP> serializer, - final Channel channel) { - - this.serverAddress = Preconditions.checkNotNull(serverAddress); - this.channel = Preconditions.checkNotNull(channel); - - // Add the client handler with the callback - channel.pipeline().addLast( - getClientName() + " Handler", - new ClientHandler<>(clientName, serializer, this) - ); - - stats.reportActiveConnection(); - } - - /** - * Close the channel with a ClosedChannelException. - */ - void close() { - close(new ClosedChannelException()); - } - - /** - * Close the channel with a cause. - * - * @param cause The cause to close the channel with. - * @return Channel close future - */ - private boolean close(Throwable cause) { - if (failureCause.compareAndSet(null, cause)) { - channel.close(); - stats.reportInactiveConnection(); - - for (long requestId : pendingRequests.keySet()) { - TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { - stats.reportFailedRequest(); - } - } - return true; - } - return false; - } - - /** - * Returns a future holding the serialized request result. - * @param request the request to be sent. - * @return Future holding the serialized result - */ - CompletableFuture<RESP> sendRequest(REQ request) { - TimestampedCompletableFuture requestPromiseTs = - new TimestampedCompletableFuture(System.nanoTime()); - try { - final long requestId = requestCount.getAndIncrement(); - pendingRequests.put(requestId, requestPromiseTs); - - stats.reportRequest(); - - ByteBuf buf = MessageSerializer.serializeRequest(channel.alloc(), requestId, request); - - channel.writeAndFlush(buf).addListener((ChannelFutureListener) future -> { - if (!future.isSuccess()) { - // Fail promise if not failed to write - TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(future.cause())) { - stats.reportFailedRequest(); - } - } - }); - - // Check failure for possible race. We don't want any lingering - // promises after a failure, which can happen if we don't check - // this here. Note that close is treated as a failure as well. - Throwable failure = failureCause.get(); - if (failure != null) { - // Remove from pending requests to guard against concurrent - // removal and to make sure that we only count it once as failed. - TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(failure)) { - stats.reportFailedRequest(); - } - } - } catch (Throwable t) { - requestPromiseTs.completeExceptionally(t); - } - - return requestPromiseTs; - } - - @Override - public void onRequestResult(long requestId, RESP response) { - TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.complete(response)) { - long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L; - stats.reportSuccessfulRequest(durationMillis); - } - } - - @Override - public void onRequestFailure(long requestId, Throwable cause) { - TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { - stats.reportFailedRequest(); - } - } - - @Override - public void onFailure(Throwable cause) { - if (close(cause)) { - // Remove from established channels, otherwise future - // requests will be handled by this failed channel. - establishedConnections.remove(serverAddress, this); - } - } - - @Override - public String toString() { - return "EstablishedConnection{" + - "serverAddress=" + serverAddress + - ", channel=" + channel + - ", pendingRequests=" + pendingRequests.size() + - ", requestCount=" + requestCount + - ", failureCause=" + failureCause + - '}'; - } - - /** - * Pair of promise and a timestamp. - */ - private class TimestampedCompletableFuture extends CompletableFuture<RESP> { - - private final long timestampInNanos; - - TimestampedCompletableFuture(long timestampInNanos) { - this.timestampInNanos = timestampInNanos; - } - - public long getTimestamp() { - return timestampInNanos; - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java deleted file mode 100644 index fc9b1d4..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.queryablestate.network.messages.MessageBody; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.queryablestate.network.messages.MessageType; -import org.apache.flink.queryablestate.network.messages.RequestFailure; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.channels.ClosedChannelException; - -/** - * The handler used by a {@link Client} to handling incoming messages. - * - * @param <REQ> the type of request the client will send. - * @param <RESP> the type of response the client expects to receive. - */ -@Internal -public class ClientHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter { - - private static final Logger LOG = LoggerFactory.getLogger(ClientHandler.class); - - private final String clientName; - - private final MessageSerializer<REQ, RESP> serializer; - - private final ClientHandlerCallback<RESP> callback; - - /** - * Creates a handler with the callback. - * - * @param clientName the name of the client. - * @param serializer the serializer used to (de-)serialize messages. - * @param callback Callback for responses. - */ - public ClientHandler( - final String clientName, - final MessageSerializer<REQ, RESP> serializer, - final ClientHandlerCallback<RESP> callback) { - - this.clientName = Preconditions.checkNotNull(clientName); - this.serializer = Preconditions.checkNotNull(serializer); - this.callback = Preconditions.checkNotNull(callback); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - try { - ByteBuf buf = (ByteBuf) msg; - MessageType msgType = MessageSerializer.deserializeHeader(buf); - - if (msgType == MessageType.REQUEST_RESULT) { - long requestId = MessageSerializer.getRequestId(buf); - RESP result = serializer.deserializeResponse(buf); - callback.onRequestResult(requestId, result); - } else if (msgType == MessageType.REQUEST_FAILURE) { - RequestFailure failure = MessageSerializer.deserializeRequestFailure(buf); - callback.onRequestFailure(failure.getRequestId(), failure.getCause()); - } else if (msgType == MessageType.SERVER_FAILURE) { - throw MessageSerializer.deserializeServerFailure(buf); - } else { - throw new IllegalStateException("Unexpected response type '" + msgType + "'"); - } - } catch (Throwable t1) { - try { - callback.onFailure(t1); - } catch (Throwable t2) { - LOG.error("Failed to notify callback about failure", t2); - } - } finally { - ReferenceCountUtil.release(msg); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - try { - callback.onFailure(cause); - } catch (Throwable t) { - LOG.error("Failed to notify callback about failure", t); - } - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - // Only the client is expected to close the channel. Otherwise it - // indicates a failure. Note that this will be invoked in both cases - // though. If the callback closed the channel, the callback must be - // ignored. - try { - callback.onFailure(new ClosedChannelException()); - } catch (Throwable t) { - LOG.error("Failed to notify callback about failure", t); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java deleted file mode 100644 index 00ce1ed..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.queryablestate.network.messages.MessageBody; - -/** - * Callback for {@link ClientHandler}. - */ -@Internal -public interface ClientHandlerCallback<RESP extends MessageBody> { - - /** - * Called on a successful request. - * - * @param requestId ID of the request - * @param response The received response - */ - void onRequestResult(long requestId, RESP response); - - /** - * Called on a failed request. - * - * @param requestId ID of the request - * @param cause Cause of the request failure - */ - void onRequestFailure(long requestId, Throwable cause); - - /** - * Called on any failure, which is not related to a specific request. - * - * <p>This can be for example a caught Exception in the channel pipeline - * or an unexpected channel close. - * - * @param cause Cause of the failure - */ - void onFailure(Throwable cause); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java deleted file mode 100644 index f26c267..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network.messages; - -import org.apache.flink.annotation.Internal; - -/** - * The base class for every message exchanged during the communication between - * {@link org.apache.flink.queryablestate.network.Client client} and - * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}. - * - * <p>Every such message should also have a {@link MessageDeserializer}. - */ -@Internal -public abstract class MessageBody { - - /** - * Serializes the message into a byte array. - * @return A byte array with the serialized content of the message. - */ - public abstract byte[] serialize(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java deleted file mode 100644 index 436fb82..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network.messages; - -import org.apache.flink.annotation.Internal; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; - -/** - * A utility used to deserialize a {@link MessageBody message}. - * @param <M> The type of the message to be deserialized. - * It has to extend {@link MessageBody} - */ -@Internal -public interface MessageDeserializer<M extends MessageBody> { - - /** - * Deserializes a message contained in a byte buffer. - * @param buf the buffer containing the message. - * @return The deserialized message. - */ - M deserializeMessage(ByteBuf buf); -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java deleted file mode 100644 index c0a0d32..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network.messages; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufOutputStream; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutput; -import java.io.ObjectOutputStream; - -/** - * Serialization and deserialization of messages exchanged between - * {@link org.apache.flink.queryablestate.network.Client client} and - * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}. - * - * <p>The binary messages have the following format: - * - * <pre> - * <------ Frame -------------------------> - * +----------------------------------------+ - * | HEADER (8) | PAYLOAD (VAR) | - * +------------------+----------------------------------------+ - * | FRAME LENGTH (4) | VERSION (4) | TYPE (4) | CONTENT (VAR) | - * +------------------+----------------------------------------+ - * </pre> - * - * <p>The concrete content of a message depends on the {@link MessageType}. - * - * @param <REQ> Type of the requests of the protocol. - * @param <RESP> Type of the responses of the protocol. - */ -@Internal -public final class MessageSerializer<REQ extends MessageBody, RESP extends MessageBody> { - - /** The serialization version ID. */ - private static final int VERSION = 0x79a1b710; - - /** Byte length of the header. */ - private static final int HEADER_LENGTH = 2 * Integer.BYTES; - - /** Byte length of the request id. */ - private static final int REQUEST_ID_SIZE = Long.BYTES; - - /** The constructor of the {@link MessageBody client requests}. Used for deserialization. */ - private final MessageDeserializer<REQ> requestDeserializer; - - /** The constructor of the {@link MessageBody server responses}. Used for deserialization. */ - private final MessageDeserializer<RESP> responseDeserializer; - - public MessageSerializer(MessageDeserializer<REQ> requestDeser, MessageDeserializer<RESP> responseDeser) { - requestDeserializer = Preconditions.checkNotNull(requestDeser); - responseDeserializer = Preconditions.checkNotNull(responseDeser); - } - - // ------------------------------------------------------------------------ - // Serialization - // ------------------------------------------------------------------------ - - /** - * Serializes the request sent to the - * {@link org.apache.flink.queryablestate.network.AbstractServerBase}. - * - * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. - * @param requestId The id of the request to which the message refers to. - * @param request The request to be serialized. - * @return A {@link ByteBuf} containing the serialized message. - */ - public static <REQ extends MessageBody> ByteBuf serializeRequest( - final ByteBufAllocator alloc, - final long requestId, - final REQ request) { - Preconditions.checkNotNull(request); - return writePayload(alloc, requestId, MessageType.REQUEST, request.serialize()); - } - - /** - * Serializes the response sent to the - * {@link org.apache.flink.queryablestate.network.Client}. - * - * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. - * @param requestId The id of the request to which the message refers to. - * @param response The response to be serialized. - * @return A {@link ByteBuf} containing the serialized message. - */ - public static <RESP extends MessageBody> ByteBuf serializeResponse( - final ByteBufAllocator alloc, - final long requestId, - final RESP response) { - Preconditions.checkNotNull(response); - return writePayload(alloc, requestId, MessageType.REQUEST_RESULT, response.serialize()); - } - - /** - * Serializes the exception containing the failure message sent to the - * {@link org.apache.flink.queryablestate.network.Client} in case of - * protocol related errors. - * - * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. - * @param requestId The id of the request to which the message refers to. - * @param cause The exception thrown at the server. - * @return A {@link ByteBuf} containing the serialized message. - */ - public static ByteBuf serializeRequestFailure( - final ByteBufAllocator alloc, - final long requestId, - final Throwable cause) throws IOException { - - final ByteBuf buf = alloc.ioBuffer(); - - // Frame length is set at the end - buf.writeInt(0); - writeHeader(buf, MessageType.REQUEST_FAILURE); - buf.writeLong(requestId); - - try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf); - ObjectOutput out = new ObjectOutputStream(bbos)) { - out.writeObject(cause); - } - - // Set frame length - int frameLength = buf.readableBytes() - Integer.BYTES; - buf.setInt(0, frameLength); - return buf; - } - - /** - * Serializes the failure message sent to the - * {@link org.apache.flink.queryablestate.network.Client} in case of - * server related errors. - * - * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. - * @param cause The exception thrown at the server. - * @return The failure message. - */ - public static ByteBuf serializeServerFailure( - final ByteBufAllocator alloc, - final Throwable cause) throws IOException { - - final ByteBuf buf = alloc.ioBuffer(); - - // Frame length is set at end - buf.writeInt(0); - writeHeader(buf, MessageType.SERVER_FAILURE); - - try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf); - ObjectOutput out = new ObjectOutputStream(bbos)) { - out.writeObject(cause); - } - - // Set frame length - int frameLength = buf.readableBytes() - Integer.BYTES; - buf.setInt(0, frameLength); - return buf; - } - - /** - * Helper for serializing the header. - * - * @param buf The {@link ByteBuf} to serialize the header into. - * @param messageType The {@link MessageType} of the message this header refers to. - */ - private static void writeHeader(final ByteBuf buf, final MessageType messageType) { - buf.writeInt(VERSION); - buf.writeInt(messageType.ordinal()); - } - - /** - * Helper for serializing the messages. - * - * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. - * @param requestId The id of the request to which the message refers to. - * @param messageType The {@link MessageType type of the message}. - * @param payload The serialized version of the message. - * @return A {@link ByteBuf} containing the serialized message. - */ - private static ByteBuf writePayload( - final ByteBufAllocator alloc, - final long requestId, - final MessageType messageType, - final byte[] payload) { - - final int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + payload.length; - final ByteBuf buf = alloc.ioBuffer(frameLength + Integer.BYTES); - - buf.writeInt(frameLength); - writeHeader(buf, messageType); - buf.writeLong(requestId); - buf.writeBytes(payload); - return buf; - } - - // ------------------------------------------------------------------------ - // Deserialization - // ------------------------------------------------------------------------ - - /** - * De-serializes the header and returns the {@link MessageType}. - * <pre> - * <b>The buffer is expected to be at the header position.</b> - * </pre> - * @param buf The {@link ByteBuf} containing the serialized header. - * @return The message type. - * @throws IllegalStateException If unexpected message version or message type. - */ - public static MessageType deserializeHeader(final ByteBuf buf) { - - // checking the version - int version = buf.readInt(); - Preconditions.checkState(version == VERSION, - "Version Mismatch: Found " + version + ", Expected: " + VERSION + '.'); - - // fetching the message type - int msgType = buf.readInt(); - MessageType[] values = MessageType.values(); - Preconditions.checkState(msgType >= 0 && msgType < values.length, - "Illegal message type with index " + msgType + '.'); - return values[msgType]; - } - - /** - * De-serializes the header and returns the {@link MessageType}. - * <pre> - * <b>The buffer is expected to be at the request id position.</b> - * </pre> - * @param buf The {@link ByteBuf} containing the serialized request id. - * @return The request id. - */ - public static long getRequestId(final ByteBuf buf) { - return buf.readLong(); - } - - /** - * De-serializes the request sent to the - * {@link org.apache.flink.queryablestate.network.AbstractServerBase}. - * <pre> - * <b>The buffer is expected to be at the request position.</b> - * </pre> - * @param buf The {@link ByteBuf} containing the serialized request. - * @return The request. - */ - public REQ deserializeRequest(final ByteBuf buf) { - Preconditions.checkNotNull(buf); - return requestDeserializer.deserializeMessage(buf); - } - - /** - * De-serializes the response sent to the - * {@link org.apache.flink.queryablestate.network.Client}. - * <pre> - * <b>The buffer is expected to be at the response position.</b> - * </pre> - * @param buf The {@link ByteBuf} containing the serialized response. - * @return The response. - */ - public RESP deserializeResponse(final ByteBuf buf) { - Preconditions.checkNotNull(buf); - return responseDeserializer.deserializeMessage(buf); - } - - /** - * De-serializes the {@link RequestFailure} sent to the - * {@link org.apache.flink.queryablestate.network.Client} in case of - * protocol related errors. - * <pre> - * <b>The buffer is expected to be at the correct position.</b> - * </pre> - * @param buf The {@link ByteBuf} containing the serialized failure message. - * @return The failure message. - */ - public static RequestFailure deserializeRequestFailure(final ByteBuf buf) throws IOException, ClassNotFoundException { - long requestId = buf.readLong(); - - Throwable cause; - try (ByteBufInputStream bis = new ByteBufInputStream(buf); - ObjectInputStream in = new ObjectInputStream(bis)) { - cause = (Throwable) in.readObject(); - } - return new RequestFailure(requestId, cause); - } - - /** - * De-serializes the failure message sent to the - * {@link org.apache.flink.queryablestate.network.Client} in case of - * server related errors. - * <pre> - * <b>The buffer is expected to be at the correct position.</b> - * </pre> - * @param buf The {@link ByteBuf} containing the serialized failure message. - * @return The failure message. - */ - public static Throwable deserializeServerFailure(final ByteBuf buf) throws IOException, ClassNotFoundException { - try (ByteBufInputStream bis = new ByteBufInputStream(buf); - ObjectInputStream in = new ObjectInputStream(bis)) { - return (Throwable) in.readObject(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java deleted file mode 100644 index 562ce93..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network.messages; - -import org.apache.flink.annotation.Internal; - -/** - * Expected message types during the communication between - * {@link org.apache.flink.queryablestate.network.Client client} and - * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}. - */ -@Internal -public enum MessageType { - - /** The message is a request. */ - REQUEST, - - /** The message is a successful response. */ - REQUEST_RESULT, - - /** The message indicates a protocol-related failure. */ - REQUEST_FAILURE, - - /** The message indicates a server failure. */ - SERVER_FAILURE -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java deleted file mode 100644 index 106199f..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network.messages; - -import org.apache.flink.annotation.Internal; - -/** - * A message indicating a protocol-related error. - */ -@Internal -public class RequestFailure { - - /** ID of the request responding to. */ - private final long requestId; - - /** Failure cause. Not allowed to be a user type. */ - private final Throwable cause; - - /** - * Creates a failure response to a {@link MessageBody}. - * - * @param requestId ID for the request responding to - * @param cause Failure cause (not allowed to be a user type) - */ - public RequestFailure(long requestId, Throwable cause) { - this.requestId = requestId; - this.cause = cause; - } - - /** - * Returns the request ID responding to. - * - * @return Request ID responding to - */ - public long getRequestId() { - return requestId; - } - - /** - * Returns the failure cause. - * - * @return Failure cause - */ - public Throwable getCause() { - return cause; - } - - @Override - public String toString() { - return "RequestFailure{" + - "requestId=" + requestId + - ", cause=" + cause + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java deleted file mode 100644 index 055a5d0..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.server; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException; -import org.apache.flink.queryablestate.UnknownKvStateIdException; -import org.apache.flink.queryablestate.messages.KvStateInternalRequest; -import org.apache.flink.queryablestate.messages.KvStateResponse; -import org.apache.flink.queryablestate.network.AbstractServerHandler; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.netty.KvStateRequestStats; -import org.apache.flink.runtime.state.internal.InternalKvState; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CompletableFuture; - -/** - * This handler dispatches asynchronous tasks, which query {@link InternalKvState} - * instances and write the result to the channel. - * - * <p>The network threads receive the message, deserialize it and dispatch the - * query task. The actual query is handled in a separate thread as it might - * otherwise block the network threads (file I/O etc.). - */ -@Internal [email protected] -public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> { - - private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class); - - /** KvState registry holding references to the KvState instances. */ - private final KvStateRegistry registry; - - /** - * Create the handler used by the {@link KvStateServerImpl}. - * - * @param server the {@link KvStateServerImpl} using the handler. - * @param kvStateRegistry registry to query. - * @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages. - * @param stats server statistics collector. - */ - public KvStateServerHandler( - final KvStateServerImpl server, - final KvStateRegistry kvStateRegistry, - final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer, - final KvStateRequestStats stats) { - - super(server, serializer, stats); - this.registry = Preconditions.checkNotNull(kvStateRegistry); - } - - @Override - public CompletableFuture<KvStateResponse> handleRequest(final long requestId, final KvStateInternalRequest request) { - final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>(); - - try { - final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId()); - if (kvState == null) { - responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId())); - } else { - byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace(); - - byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace); - if (serializedResult != null) { - responseFuture.complete(new KvStateResponse(serializedResult)); - } else { - responseFuture.completeExceptionally(new UnknownKeyOrNamespaceException(getServerName())); - } - } - return responseFuture; - } catch (Throwable t) { - String errMsg = "Error while processing request with ID " + requestId + - ". Caused by: " + ExceptionUtils.stringifyException(t); - responseFuture.completeExceptionally(new RuntimeException(errMsg)); - return responseFuture; - } - } - - @Override - public void shutdown() { - // do nothing - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java deleted file mode 100644 index dfca915..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.server; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.queryablestate.messages.KvStateInternalRequest; -import org.apache.flink.queryablestate.messages.KvStateResponse; -import org.apache.flink.queryablestate.network.AbstractServerBase; -import org.apache.flink.queryablestate.network.AbstractServerHandler; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateServer; -import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.query.netty.KvStateRequestStats; -import org.apache.flink.util.Preconditions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetAddress; -import java.util.Iterator; - -/** - * The default implementation of the {@link KvStateServer}. - */ -@Internal -public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer { - - private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class); - - /** The {@link KvStateRegistry} to query for state instances. */ - private final KvStateRegistry kvStateRegistry; - - private final KvStateRequestStats stats; - - private MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer; - - /** - * Creates the state server. - * - * <p>The server is instantiated using reflection by the - * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats) - * QueryableStateUtils.createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats)}. - * - * <p>The server needs to be started via {@link #start()} in order to bind - * to the configured bind address. - * - * @param bindAddress the address to listen to. - * @param bindPortIterator the port range to try to bind to. - * @param numEventLoopThreads number of event loop threads. - * @param numQueryThreads number of query threads. - * @param kvStateRegistry {@link KvStateRegistry} to query for state instances. - * @param stats the statistics collector. - */ - public KvStateServerImpl( - final InetAddress bindAddress, - final Iterator<Integer> bindPortIterator, - final Integer numEventLoopThreads, - final Integer numQueryThreads, - final KvStateRegistry kvStateRegistry, - final KvStateRequestStats stats) { - - super("Queryable State Server", bindAddress, bindPortIterator, numEventLoopThreads, numQueryThreads); - this.stats = Preconditions.checkNotNull(stats); - this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry); - } - - @Override - public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> initializeHandler() { - this.serializer = new MessageSerializer<>( - new KvStateInternalRequest.KvStateInternalRequestDeserializer(), - new KvStateResponse.KvStateResponseDeserializer()); - return new KvStateServerHandler(this, kvStateRegistry, serializer, stats); - } - - public MessageSerializer<KvStateInternalRequest, KvStateResponse> getSerializer() { - Preconditions.checkState(serializer != null, "Server " + getServerName() + " has not been started."); - return serializer; - } - - @Override - public void start() throws Throwable { - super.start(); - } - - @Override - public KvStateServerAddress getServerAddress() { - return super.getServerAddress(); - } - - @Override - public void shutdown() { - super.shutdown(); - } -}
