http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java new file mode 100644 index 0000000..e6d59de --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/Client.java @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.network.messages.MessageBody; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.io.network.netty.NettyBufferPool; +import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.query.netty.KvStateRequestStats; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; +import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; + +import java.nio.channels.ClosedChannelException; +import java.util.ArrayDeque; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * The base class for every client in the queryable state module. + * It is using pure netty to send and receive messages of type {@link MessageBody}. + * + * @param <REQ> the type of request the client will send. + * @param <RESP> the type of response the client expects to receive. + */ +@Internal +public class Client<REQ extends MessageBody, RESP extends MessageBody> { + + /** The name of the client. Used for logging and stack traces.*/ + private final String clientName; + + /** Netty's Bootstrap. */ + private final Bootstrap bootstrap; + + /** The serializer to be used for (de-)serializing messages. */ + private final MessageSerializer<REQ, RESP> messageSerializer; + + /** Statistics tracker. */ + private final KvStateRequestStats stats; + + /** Established connections. */ + private final Map<KvStateServerAddress, EstablishedConnection> establishedConnections = new ConcurrentHashMap<>(); + + /** Pending connections. */ + private final Map<KvStateServerAddress, PendingConnection> pendingConnections = new ConcurrentHashMap<>(); + + /** Atomic shut down flag. */ + private final AtomicBoolean shutDown = new AtomicBoolean(); + + /** + * Creates a client with the specified number of event loop threads. + * + * @param clientName the name of the client. + * @param numEventLoopThreads number of event loop threads (minimum 1). + * @param serializer the serializer used to (de-)serialize messages. + * @param stats the statistics collector. + */ + public Client( + final String clientName, + final int numEventLoopThreads, + final MessageSerializer<REQ, RESP> serializer, + final KvStateRequestStats stats) { + + Preconditions.checkArgument(numEventLoopThreads >= 1, + "Non-positive number of event loop threads."); + + this.clientName = Preconditions.checkNotNull(clientName); + this.messageSerializer = Preconditions.checkNotNull(serializer); + this.stats = Preconditions.checkNotNull(stats); + + final ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Flink " + clientName + " Event Loop Thread %d") + .build(); + + final EventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory); + final ByteBufAllocator bufferPool = new NettyBufferPool(numEventLoopThreads); + + this.bootstrap = new Bootstrap() + .group(nioGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.ALLOCATOR, bufferPool) + .handler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + channel.pipeline() + .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) + .addLast(new ChunkedWriteHandler()); + } + }); + } + + public String getClientName() { + return clientName; + } + + public CompletableFuture<RESP> sendRequest(final KvStateServerAddress serverAddress, final REQ request) { + if (shutDown.get()) { + return FutureUtils.getFailedFuture(new IllegalStateException("Shut down")); + } + + EstablishedConnection connection = establishedConnections.get(serverAddress); + if (connection != null) { + return connection.sendRequest(request); + } else { + PendingConnection pendingConnection = pendingConnections.get(serverAddress); + if (pendingConnection != null) { + // There was a race, use the existing pending connection. + return pendingConnection.sendRequest(request); + } else { + // We try to connect to the server. + PendingConnection pending = new PendingConnection(serverAddress, messageSerializer); + PendingConnection previous = pendingConnections.putIfAbsent(serverAddress, pending); + + if (previous == null) { + // OK, we are responsible to connect. + bootstrap.connect(serverAddress.getHost(), serverAddress.getPort()).addListener(pending); + return pending.sendRequest(request); + } else { + // There was a race, use the existing pending connection. + return previous.sendRequest(request); + } + } + } + } + + /** + * Shuts down the client and closes all connections. + * + * <p>After a call to this method, all returned futures will be failed. + */ + public void shutdown() { + if (shutDown.compareAndSet(false, true)) { + for (Map.Entry<KvStateServerAddress, EstablishedConnection> conn : establishedConnections.entrySet()) { + if (establishedConnections.remove(conn.getKey(), conn.getValue())) { + conn.getValue().close(); + } + } + + for (Map.Entry<KvStateServerAddress, PendingConnection> conn : pendingConnections.entrySet()) { + if (pendingConnections.remove(conn.getKey()) != null) { + conn.getValue().close(); + } + } + + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + } + } + } + } + + /** + * A pending connection that is in the process of connecting. + */ + private class PendingConnection implements ChannelFutureListener { + + /** Lock to guard the connect call, channel hand in, etc. */ + private final Object connectLock = new Object(); + + /** Address of the server we are connecting to. */ + private final KvStateServerAddress serverAddress; + + private final MessageSerializer<REQ, RESP> serializer; + + /** Queue of requests while connecting. */ + private final ArrayDeque<PendingRequest> queuedRequests = new ArrayDeque<>(); + + /** The established connection after the connect succeeds. */ + private EstablishedConnection established; + + /** Closed flag. */ + private boolean closed; + + /** Failure cause if something goes wrong. */ + private Throwable failureCause; + + /** + * Creates a pending connection to the given server. + * + * @param serverAddress Address of the server to connect to. + */ + private PendingConnection( + final KvStateServerAddress serverAddress, + final MessageSerializer<REQ, RESP> serializer) { + this.serverAddress = serverAddress; + this.serializer = serializer; + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + handInChannel(future.channel()); + } else { + close(future.cause()); + } + } + + /** + * Returns a future holding the serialized request result. + * + * <p>If the channel has been established, forward the call to the + * established channel, otherwise queue it for when the channel is + * handed in. + * + * @param request the request to be sent. + * @return Future holding the serialized result + */ + public CompletableFuture<RESP> sendRequest(REQ request) { + synchronized (connectLock) { + if (failureCause != null) { + return FutureUtils.getFailedFuture(failureCause); + } else if (closed) { + return FutureUtils.getFailedFuture(new ClosedChannelException()); + } else { + if (established != null) { + return established.sendRequest(request); + } else { + // Queue this and handle when connected + final PendingRequest pending = new PendingRequest(request); + queuedRequests.add(pending); + return pending; + } + } + } + } + + /** + * Hands in a channel after a successful connection. + * + * @param channel Channel to hand in + */ + private void handInChannel(Channel channel) { + synchronized (connectLock) { + if (closed || failureCause != null) { + // Close the channel and we are done. Any queued requests + // are removed on the close/failure call and after that no + // new ones can be enqueued. + channel.close(); + } else { + established = new EstablishedConnection(serverAddress, serializer, channel); + + while (!queuedRequests.isEmpty()) { + final PendingRequest pending = queuedRequests.poll(); + + established.sendRequest(pending.request) + .thenAccept(resp -> pending.complete(resp)) + .exceptionally(throwable -> { + pending.completeExceptionally(throwable); + return null; + }); + } + + // Publish the channel for the general public + establishedConnections.put(serverAddress, established); + pendingConnections.remove(serverAddress); + + // Check shut down for possible race with shut down. We + // don't want any lingering connections after shut down, + // which can happen if we don't check this here. + if (shutDown.get()) { + if (establishedConnections.remove(serverAddress, established)) { + established.close(); + } + } + } + } + } + + /** + * Close the connecting channel with a ClosedChannelException. + */ + private void close() { + close(new ClosedChannelException()); + } + + /** + * Close the connecting channel with an Exception (can be {@code null}) + * or forward to the established channel. + */ + private void close(Throwable cause) { + synchronized (connectLock) { + if (!closed) { + if (failureCause == null) { + failureCause = cause; + } + + if (established != null) { + established.close(); + } else { + PendingRequest pending; + while ((pending = queuedRequests.poll()) != null) { + pending.completeExceptionally(cause); + } + } + closed = true; + } + } + } + + @Override + public String toString() { + synchronized (connectLock) { + return "PendingConnection{" + + "serverAddress=" + serverAddress + + ", queuedRequests=" + queuedRequests.size() + + ", established=" + (established != null) + + ", closed=" + closed + + '}'; + } + } + + /** + * A pending request queued while the channel is connecting. + */ + private final class PendingRequest extends CompletableFuture<RESP> { + + private final REQ request; + + private PendingRequest(REQ request) { + this.request = request; + } + } + } + + /** + * An established connection that wraps the actual channel instance and is + * registered at the {@link ClientHandler} for callbacks. + */ + private class EstablishedConnection implements ClientHandlerCallback<RESP> { + + /** Address of the server we are connected to. */ + private final KvStateServerAddress serverAddress; + + /** The actual TCP channel. */ + private final Channel channel; + + /** Pending requests keyed by request ID. */ + private final ConcurrentHashMap<Long, TimestampedCompletableFuture> pendingRequests = new ConcurrentHashMap<>(); + + /** Current request number used to assign unique request IDs. */ + private final AtomicLong requestCount = new AtomicLong(); + + /** Reference to a failure that was reported by the channel. */ + private final AtomicReference<Throwable> failureCause = new AtomicReference<>(); + + /** + * Creates an established connection with the given channel. + * + * @param serverAddress Address of the server connected to + * @param channel The actual TCP channel + */ + EstablishedConnection( + final KvStateServerAddress serverAddress, + final MessageSerializer<REQ, RESP> serializer, + final Channel channel) { + + this.serverAddress = Preconditions.checkNotNull(serverAddress); + this.channel = Preconditions.checkNotNull(channel); + + // Add the client handler with the callback + channel.pipeline().addLast( + getClientName() + " Handler", + new ClientHandler<>(clientName, serializer, this) + ); + + stats.reportActiveConnection(); + } + + /** + * Close the channel with a ClosedChannelException. + */ + void close() { + close(new ClosedChannelException()); + } + + /** + * Close the channel with a cause. + * + * @param cause The cause to close the channel with. + * @return Channel close future + */ + private boolean close(Throwable cause) { + if (failureCause.compareAndSet(null, cause)) { + channel.close(); + stats.reportInactiveConnection(); + + for (long requestId : pendingRequests.keySet()) { + TimestampedCompletableFuture pending = pendingRequests.remove(requestId); + if (pending != null && pending.completeExceptionally(cause)) { + stats.reportFailedRequest(); + } + } + return true; + } + return false; + } + + /** + * Returns a future holding the serialized request result. + * @param request the request to be sent. + * @return Future holding the serialized result + */ + CompletableFuture<RESP> sendRequest(REQ request) { + TimestampedCompletableFuture requestPromiseTs = + new TimestampedCompletableFuture(System.nanoTime()); + try { + final long requestId = requestCount.getAndIncrement(); + pendingRequests.put(requestId, requestPromiseTs); + + stats.reportRequest(); + + ByteBuf buf = MessageSerializer.serializeRequest(channel.alloc(), requestId, request); + + channel.writeAndFlush(buf).addListener((ChannelFutureListener) future -> { + if (!future.isSuccess()) { + // Fail promise if not failed to write + TimestampedCompletableFuture pending = pendingRequests.remove(requestId); + if (pending != null && pending.completeExceptionally(future.cause())) { + stats.reportFailedRequest(); + } + } + }); + + // Check failure for possible race. We don't want any lingering + // promises after a failure, which can happen if we don't check + // this here. Note that close is treated as a failure as well. + Throwable failure = failureCause.get(); + if (failure != null) { + // Remove from pending requests to guard against concurrent + // removal and to make sure that we only count it once as failed. + TimestampedCompletableFuture pending = pendingRequests.remove(requestId); + if (pending != null && pending.completeExceptionally(failure)) { + stats.reportFailedRequest(); + } + } + } catch (Throwable t) { + requestPromiseTs.completeExceptionally(t); + } + + return requestPromiseTs; + } + + @Override + public void onRequestResult(long requestId, RESP response) { + TimestampedCompletableFuture pending = pendingRequests.remove(requestId); + if (pending != null && pending.complete(response)) { + long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L; + stats.reportSuccessfulRequest(durationMillis); + } + } + + @Override + public void onRequestFailure(long requestId, Throwable cause) { + TimestampedCompletableFuture pending = pendingRequests.remove(requestId); + if (pending != null && pending.completeExceptionally(cause)) { + stats.reportFailedRequest(); + } + } + + @Override + public void onFailure(Throwable cause) { + if (close(cause)) { + // Remove from established channels, otherwise future + // requests will be handled by this failed channel. + establishedConnections.remove(serverAddress, this); + } + } + + @Override + public String toString() { + return "EstablishedConnection{" + + "serverAddress=" + serverAddress + + ", channel=" + channel + + ", pendingRequests=" + pendingRequests.size() + + ", requestCount=" + requestCount + + ", failureCause=" + failureCause + + '}'; + } + + /** + * Pair of promise and a timestamp. + */ + private class TimestampedCompletableFuture extends CompletableFuture<RESP> { + + private final long timestampInNanos; + + TimestampedCompletableFuture(long timestampInNanos) { + this.timestampInNanos = timestampInNanos; + } + + public long getTimestamp() { + return timestampInNanos; + } + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java new file mode 100644 index 0000000..fc9b1d4 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.network.messages.MessageBody; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.messages.MessageType; +import org.apache.flink.queryablestate.network.messages.RequestFailure; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.channels.ClosedChannelException; + +/** + * The handler used by a {@link Client} to handling incoming messages. + * + * @param <REQ> the type of request the client will send. + * @param <RESP> the type of response the client expects to receive. + */ +@Internal +public class ClientHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(ClientHandler.class); + + private final String clientName; + + private final MessageSerializer<REQ, RESP> serializer; + + private final ClientHandlerCallback<RESP> callback; + + /** + * Creates a handler with the callback. + * + * @param clientName the name of the client. + * @param serializer the serializer used to (de-)serialize messages. + * @param callback Callback for responses. + */ + public ClientHandler( + final String clientName, + final MessageSerializer<REQ, RESP> serializer, + final ClientHandlerCallback<RESP> callback) { + + this.clientName = Preconditions.checkNotNull(clientName); + this.serializer = Preconditions.checkNotNull(serializer); + this.callback = Preconditions.checkNotNull(callback); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + try { + ByteBuf buf = (ByteBuf) msg; + MessageType msgType = MessageSerializer.deserializeHeader(buf); + + if (msgType == MessageType.REQUEST_RESULT) { + long requestId = MessageSerializer.getRequestId(buf); + RESP result = serializer.deserializeResponse(buf); + callback.onRequestResult(requestId, result); + } else if (msgType == MessageType.REQUEST_FAILURE) { + RequestFailure failure = MessageSerializer.deserializeRequestFailure(buf); + callback.onRequestFailure(failure.getRequestId(), failure.getCause()); + } else if (msgType == MessageType.SERVER_FAILURE) { + throw MessageSerializer.deserializeServerFailure(buf); + } else { + throw new IllegalStateException("Unexpected response type '" + msgType + "'"); + } + } catch (Throwable t1) { + try { + callback.onFailure(t1); + } catch (Throwable t2) { + LOG.error("Failed to notify callback about failure", t2); + } + } finally { + ReferenceCountUtil.release(msg); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + try { + callback.onFailure(cause); + } catch (Throwable t) { + LOG.error("Failed to notify callback about failure", t); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + // Only the client is expected to close the channel. Otherwise it + // indicates a failure. Note that this will be invoked in both cases + // though. If the callback closed the channel, the callback must be + // ignored. + try { + callback.onFailure(new ClosedChannelException()); + } catch (Throwable t) { + LOG.error("Failed to notify callback about failure", t); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java new file mode 100644 index 0000000..00ce1ed --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.network.messages.MessageBody; + +/** + * Callback for {@link ClientHandler}. + */ +@Internal +public interface ClientHandlerCallback<RESP extends MessageBody> { + + /** + * Called on a successful request. + * + * @param requestId ID of the request + * @param response The received response + */ + void onRequestResult(long requestId, RESP response); + + /** + * Called on a failed request. + * + * @param requestId ID of the request + * @param cause Cause of the request failure + */ + void onRequestFailure(long requestId, Throwable cause); + + /** + * Called on any failure, which is not related to a specific request. + * + * <p>This can be for example a caught Exception in the channel pipeline + * or an unexpected channel close. + * + * @param cause Cause of the failure + */ + void onFailure(Throwable cause); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java new file mode 100644 index 0000000..f26c267 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network.messages; + +import org.apache.flink.annotation.Internal; + +/** + * The base class for every message exchanged during the communication between + * {@link org.apache.flink.queryablestate.network.Client client} and + * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}. + * + * <p>Every such message should also have a {@link MessageDeserializer}. + */ +@Internal +public abstract class MessageBody { + + /** + * Serializes the message into a byte array. + * @return A byte array with the serialized content of the message. + */ + public abstract byte[] serialize(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java new file mode 100644 index 0000000..436fb82 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network.messages; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +/** + * A utility used to deserialize a {@link MessageBody message}. + * @param <M> The type of the message to be deserialized. + * It has to extend {@link MessageBody} + */ +@Internal +public interface MessageDeserializer<M extends MessageBody> { + + /** + * Deserializes a message contained in a byte buffer. + * @param buf the buffer containing the message. + * @return The deserialized message. + */ + M deserializeMessage(ByteBuf buf); +} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java index 32bca64..c0a0d32 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java @@ -18,11 +18,7 @@ package org.apache.flink.queryablestate.network.messages; -import org.apache.flink.queryablestate.messages.KvStateRequest; -import org.apache.flink.queryablestate.messages.KvStateRequestFailure; -import org.apache.flink.queryablestate.messages.KvStateRequestResult; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.util.AbstractID; +import org.apache.flink.annotation.Internal; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; @@ -37,8 +33,8 @@ import java.io.ObjectOutputStream; /** * Serialization and deserialization of messages exchanged between - * {@link org.apache.flink.queryablestate.client.KvStateClient client} and - * {@link org.apache.flink.queryablestate.server.KvStateServerImpl server}. + * {@link org.apache.flink.queryablestate.network.Client client} and + * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}. * * <p>The binary messages have the following format: * @@ -52,8 +48,12 @@ import java.io.ObjectOutputStream; * </pre> * * <p>The concrete content of a message depends on the {@link MessageType}. + * + * @param <REQ> Type of the requests of the protocol. + * @param <RESP> Type of the responses of the protocol. */ -public final class MessageSerializer { +@Internal +public final class MessageSerializer<REQ extends MessageBody, RESP extends MessageBody> { /** The serialization version ID. */ private static final int VERSION = 0x79a1b710; @@ -64,78 +64,58 @@ public final class MessageSerializer { /** Byte length of the request id. */ private static final int REQUEST_ID_SIZE = Long.BYTES; + /** The constructor of the {@link MessageBody client requests}. Used for deserialization. */ + private final MessageDeserializer<REQ> requestDeserializer; + + /** The constructor of the {@link MessageBody server responses}. Used for deserialization. */ + private final MessageDeserializer<RESP> responseDeserializer; + + public MessageSerializer(MessageDeserializer<REQ> requestDeser, MessageDeserializer<RESP> responseDeser) { + requestDeserializer = Preconditions.checkNotNull(requestDeser); + responseDeserializer = Preconditions.checkNotNull(responseDeser); + } + // ------------------------------------------------------------------------ // Serialization // ------------------------------------------------------------------------ /** - * Allocates a buffer and serializes the KvState request into it. + * Serializes the request sent to the + * {@link org.apache.flink.queryablestate.network.AbstractServerBase}. * - * @param alloc ByteBuf allocator for the buffer to - * serialize message into - * @param requestId ID for this request - * @param kvStateId ID of the requested KvState instance - * @param serializedKeyAndNamespace Serialized key and namespace to request - * from the KvState instance. - * @return Serialized KvState request message + * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. + * @param requestId The id of the request to which the message refers to. + * @param request The request to be serialized. + * @return A {@link ByteBuf} containing the serialized message. */ - public static ByteBuf serializeKvStateRequest( - ByteBufAllocator alloc, - long requestId, - KvStateID kvStateId, - byte[] serializedKeyAndNamespace) { - - // Header + request ID + KvState ID + Serialized namespace - int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + AbstractID.SIZE + (Integer.BYTES + serializedKeyAndNamespace.length); - ByteBuf buf = alloc.ioBuffer(frameLength + 4); // +4 for frame length - - buf.writeInt(frameLength); - - writeHeader(buf, MessageType.REQUEST); - - buf.writeLong(requestId); - buf.writeLong(kvStateId.getLowerPart()); - buf.writeLong(kvStateId.getUpperPart()); - buf.writeInt(serializedKeyAndNamespace.length); - buf.writeBytes(serializedKeyAndNamespace); - - return buf; + public static <REQ extends MessageBody> ByteBuf serializeRequest( + final ByteBufAllocator alloc, + final long requestId, + final REQ request) { + Preconditions.checkNotNull(request); + return writePayload(alloc, requestId, MessageType.REQUEST, request.serialize()); } /** - * Allocates a buffer and serializes the KvState request result into it. + * Serializes the response sent to the + * {@link org.apache.flink.queryablestate.network.Client}. * - * @param alloc ByteBuf allocator for the buffer to serialize message into - * @param requestId ID for this request - * @param serializedResult Serialized Result - * @return Serialized KvState request result message + * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. + * @param requestId The id of the request to which the message refers to. + * @param response The response to be serialized. + * @return A {@link ByteBuf} containing the serialized message. */ - public static ByteBuf serializeKvStateRequestResult( - ByteBufAllocator alloc, - long requestId, - byte[] serializedResult) { - - Preconditions.checkNotNull(serializedResult, "Serialized result"); - - // Header + request ID + serialized result - int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + 4 + serializedResult.length; - - // TODO: 10/5/17 there was a bug all this time? - ByteBuf buf = alloc.ioBuffer(frameLength + 4); - - buf.writeInt(frameLength); - writeHeader(buf, MessageType.REQUEST_RESULT); - buf.writeLong(requestId); - - buf.writeInt(serializedResult.length); - buf.writeBytes(serializedResult); - - return buf; + public static <RESP extends MessageBody> ByteBuf serializeResponse( + final ByteBufAllocator alloc, + final long requestId, + final RESP response) { + Preconditions.checkNotNull(response); + return writePayload(alloc, requestId, MessageType.REQUEST_RESULT, response.serialize()); } /** * Serializes the exception containing the failure message sent to the - * {@link org.apache.flink.queryablestate.client.KvStateClient} in case of + * {@link org.apache.flink.queryablestate.network.Client} in case of * protocol related errors. * * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. @@ -143,7 +123,7 @@ public final class MessageSerializer { * @param cause The exception thrown at the server. * @return A {@link ByteBuf} containing the serialized message. */ - public static ByteBuf serializeKvStateRequestFailure( + public static ByteBuf serializeRequestFailure( final ByteBufAllocator alloc, final long requestId, final Throwable cause) throws IOException { @@ -168,7 +148,7 @@ public final class MessageSerializer { /** * Serializes the failure message sent to the - * {@link org.apache.flink.queryablestate.client.KvStateClient} in case of + * {@link org.apache.flink.queryablestate.network.Client} in case of * server related errors. * * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. @@ -207,6 +187,31 @@ public final class MessageSerializer { buf.writeInt(messageType.ordinal()); } + /** + * Helper for serializing the messages. + * + * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. + * @param requestId The id of the request to which the message refers to. + * @param messageType The {@link MessageType type of the message}. + * @param payload The serialized version of the message. + * @return A {@link ByteBuf} containing the serialized message. + */ + private static ByteBuf writePayload( + final ByteBufAllocator alloc, + final long requestId, + final MessageType messageType, + final byte[] payload) { + + final int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + payload.length; + final ByteBuf buf = alloc.ioBuffer(frameLength + Integer.BYTES); + + buf.writeInt(frameLength); + writeHeader(buf, messageType); + buf.writeLong(requestId); + buf.writeBytes(payload); + return buf; + } + // ------------------------------------------------------------------------ // Deserialization // ------------------------------------------------------------------------ @@ -230,71 +235,54 @@ public final class MessageSerializer { // fetching the message type int msgType = buf.readInt(); MessageType[] values = MessageType.values(); - Preconditions.checkState(msgType >= 0 && msgType <= values.length, + Preconditions.checkState(msgType >= 0 && msgType < values.length, "Illegal message type with index " + msgType + '.'); return values[msgType]; } /** - * Deserializes the KvState request message. - * - * <p><strong>Important</strong>: the returned buffer is sliced from the - * incoming ByteBuf stream and retained. Therefore, it needs to be recycled - * by the consumer. - * - * @param buf Buffer to deserialize (expected to be positioned after header) - * @return Deserialized KvStateRequest + * De-serializes the header and returns the {@link MessageType}. + * <pre> + * <b>The buffer is expected to be at the request id position.</b> + * </pre> + * @param buf The {@link ByteBuf} containing the serialized request id. + * @return The request id. */ - public static KvStateRequest deserializeKvStateRequest(ByteBuf buf) { - long requestId = buf.readLong(); - KvStateID kvStateId = new KvStateID(buf.readLong(), buf.readLong()); - - // Serialized key and namespace - int length = buf.readInt(); - - if (length < 0) { - throw new IllegalArgumentException("Negative length for serialized key and namespace. " + - "This indicates a serialization error."); - } - - // Copy the buffer in order to be able to safely recycle the ByteBuf - byte[] serializedKeyAndNamespace = new byte[length]; - if (length > 0) { - buf.readBytes(serializedKeyAndNamespace); - } - - return new KvStateRequest(requestId, kvStateId, serializedKeyAndNamespace); + public static long getRequestId(final ByteBuf buf) { + return buf.readLong(); } /** - * Deserializes the KvState request result. - * - * @param buf Buffer to deserialize (expected to be positioned after header) - * @return Deserialized KvStateRequestResult + * De-serializes the request sent to the + * {@link org.apache.flink.queryablestate.network.AbstractServerBase}. + * <pre> + * <b>The buffer is expected to be at the request position.</b> + * </pre> + * @param buf The {@link ByteBuf} containing the serialized request. + * @return The request. */ - public static KvStateRequestResult deserializeKvStateRequestResult(ByteBuf buf) { - long requestId = buf.readLong(); - - // Serialized KvState - int length = buf.readInt(); - - if (length < 0) { - throw new IllegalArgumentException("Negative length for serialized result. " + - "This indicates a serialization error."); - } - - byte[] serializedValue = new byte[length]; - - if (length > 0) { - buf.readBytes(serializedValue); - } + public REQ deserializeRequest(final ByteBuf buf) { + Preconditions.checkNotNull(buf); + return requestDeserializer.deserializeMessage(buf); + } - return new KvStateRequestResult(requestId, serializedValue); + /** + * De-serializes the response sent to the + * {@link org.apache.flink.queryablestate.network.Client}. + * <pre> + * <b>The buffer is expected to be at the response position.</b> + * </pre> + * @param buf The {@link ByteBuf} containing the serialized response. + * @return The response. + */ + public RESP deserializeResponse(final ByteBuf buf) { + Preconditions.checkNotNull(buf); + return responseDeserializer.deserializeMessage(buf); } /** - * De-serializes the {@link KvStateRequestFailure} sent to the - * {@link org.apache.flink.queryablestate.client.KvStateClient} in case of + * De-serializes the {@link RequestFailure} sent to the + * {@link org.apache.flink.queryablestate.network.Client} in case of * protocol related errors. * <pre> * <b>The buffer is expected to be at the correct position.</b> @@ -302,7 +290,7 @@ public final class MessageSerializer { * @param buf The {@link ByteBuf} containing the serialized failure message. * @return The failure message. */ - public static KvStateRequestFailure deserializeKvStateRequestFailure(final ByteBuf buf) throws IOException, ClassNotFoundException { + public static RequestFailure deserializeRequestFailure(final ByteBuf buf) throws IOException, ClassNotFoundException { long requestId = buf.readLong(); Throwable cause; @@ -310,12 +298,12 @@ public final class MessageSerializer { ObjectInputStream in = new ObjectInputStream(bis)) { cause = (Throwable) in.readObject(); } - return new KvStateRequestFailure(requestId, cause); + return new RequestFailure(requestId, cause); } /** * De-serializes the failure message sent to the - * {@link org.apache.flink.queryablestate.client.KvStateClient} in case of + * {@link org.apache.flink.queryablestate.network.Client} in case of * server related errors. * <pre> * <b>The buffer is expected to be at the correct position.</b> http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java index 4e4435d..562ce93 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java @@ -18,11 +18,14 @@ package org.apache.flink.queryablestate.network.messages; +import org.apache.flink.annotation.Internal; + /** * Expected message types during the communication between - * {@link org.apache.flink.queryablestate.client.KvStateClient state client} and - * {@link org.apache.flink.queryablestate.server.KvStateServerImpl state server}. + * {@link org.apache.flink.queryablestate.network.Client client} and + * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}. */ +@Internal public enum MessageType { /** The message is a request. */ http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java new file mode 100644 index 0000000..106199f --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network.messages; + +import org.apache.flink.annotation.Internal; + +/** + * A message indicating a protocol-related error. + */ +@Internal +public class RequestFailure { + + /** ID of the request responding to. */ + private final long requestId; + + /** Failure cause. Not allowed to be a user type. */ + private final Throwable cause; + + /** + * Creates a failure response to a {@link MessageBody}. + * + * @param requestId ID for the request responding to + * @param cause Failure cause (not allowed to be a user type) + */ + public RequestFailure(long requestId, Throwable cause) { + this.requestId = requestId; + this.cause = cause; + } + + /** + * Returns the request ID responding to. + * + * @return Request ID responding to + */ + public long getRequestId() { + return requestId; + } + + /** + * Returns the failure cause. + * + * @return Failure cause + */ + public Throwable getCause() { + return cause; + } + + @Override + public String toString() { + return "RequestFailure{" + + "requestId=" + requestId + + ", cause=" + cause + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/ChunkedByteBuf.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/ChunkedByteBuf.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/ChunkedByteBuf.java deleted file mode 100644 index f10969e..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/ChunkedByteBuf.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.server; - -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput; -import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; - -/** - * A {@link ByteBuf} instance to be consumed in chunks by {@link ChunkedWriteHandler}, - * respecting the high and low watermarks. - * - * @see <a href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0">Low/High Watermarks</a> - */ -public class ChunkedByteBuf implements ChunkedInput<ByteBuf> { - - /** The buffer to chunk. */ - private final ByteBuf buf; - - /** Size of chunks. */ - private final int chunkSize; - - /** Closed flag. */ - private boolean isClosed; - - /** End of input flag. */ - private boolean isEndOfInput; - - public ChunkedByteBuf(ByteBuf buf, int chunkSize) { - this.buf = Preconditions.checkNotNull(buf, "Buffer"); - Preconditions.checkArgument(chunkSize > 0, "Non-positive chunk size"); - this.chunkSize = chunkSize; - } - - @Override - public boolean isEndOfInput() throws Exception { - return isClosed || isEndOfInput; - } - - @Override - public void close() throws Exception { - if (!isClosed) { - // If we did not consume the whole buffer yet, we have to release - // it here. Otherwise, it's the responsibility of the consumer. - if (!isEndOfInput) { - buf.release(); - } - - isClosed = true; - } - } - - @Override - public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { - if (isClosed) { - return null; - } else if (buf.readableBytes() <= chunkSize) { - isEndOfInput = true; - - // Don't retain as the consumer is responsible to release it - return buf.slice(); - } else { - // Return a chunk sized slice of the buffer. The ref count is - // shared with the original buffer. That's why we need to retain - // a reference here. - return buf.readSlice(chunkSize).retain(); - } - } - - @Override - public String toString() { - return "ChunkedByteBuf{" + - "buf=" + buf + - ", chunkSize=" + chunkSize + - ", isClosed=" + isClosed + - ", isEndOfInput=" + isEndOfInput + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java index 9a31fca..055a5d0 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java @@ -18,31 +18,25 @@ package org.apache.flink.queryablestate.server; -import org.apache.flink.queryablestate.UnknownKeyOrNamespace; -import org.apache.flink.queryablestate.UnknownKvStateID; -import org.apache.flink.queryablestate.messages.KvStateRequest; +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException; +import org.apache.flink.queryablestate.UnknownKvStateIdException; +import org.apache.flink.queryablestate.messages.KvStateInternalRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; +import org.apache.flink.queryablestate.network.AbstractServerHandler; import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.queryablestate.network.messages.MessageType; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.netty.KvStateRequestStats; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Objects; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletableFuture; /** * This handler dispatches asynchronous tasks, which query {@link InternalKvState} @@ -52,257 +46,62 @@ import java.util.concurrent.TimeUnit; * query task. The actual query is handled in a separate thread as it might * otherwise block the network threads (file I/O etc.). */ +@Internal @ChannelHandler.Sharable -public class KvStateServerHandler extends ChannelInboundHandlerAdapter { +public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> { private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class); /** KvState registry holding references to the KvState instances. */ private final KvStateRegistry registry; - /** Thread pool for query execution. */ - private final ExecutorService queryExecutor; - - /** Exposed server statistics. */ - private final KvStateRequestStats stats; - /** - * Create the handler. + * Create the handler used by the {@link KvStateServerImpl}. * - * @param kvStateRegistry Registry to query. - * @param queryExecutor Thread pool for query execution. - * @param stats Exposed server statistics. + * @param server the {@link KvStateServerImpl} using the handler. + * @param kvStateRegistry registry to query. + * @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages. + * @param stats server statistics collector. */ public KvStateServerHandler( - KvStateRegistry kvStateRegistry, - ExecutorService queryExecutor, - KvStateRequestStats stats) { + final KvStateServerImpl server, + final KvStateRegistry kvStateRegistry, + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer, + final KvStateRequestStats stats) { - this.registry = Objects.requireNonNull(kvStateRegistry, "KvStateRegistry"); - this.queryExecutor = Objects.requireNonNull(queryExecutor, "Query thread pool"); - this.stats = Objects.requireNonNull(stats, "KvStateRequestStats"); + super(server, serializer, stats); + this.registry = Preconditions.checkNotNull(kvStateRegistry); } @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - stats.reportActiveConnection(); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - stats.reportInactiveConnection(); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - KvStateRequest request = null; + public CompletableFuture<KvStateResponse> handleRequest(final long requestId, final KvStateInternalRequest request) { + final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>(); try { - ByteBuf buf = (ByteBuf) msg; - MessageType msgType = MessageSerializer.deserializeHeader(buf); - - if (msgType == MessageType.REQUEST) { - // ------------------------------------------------------------ - // Request - // ------------------------------------------------------------ - request = MessageSerializer.deserializeKvStateRequest(buf); - - stats.reportRequest(); - - InternalKvState<?> kvState = registry.getKvState(request.getKvStateId()); - - if (kvState != null) { - // Execute actual query async, because it is possibly - // blocking (e.g. file I/O). - // - // A submission failure is not treated as fatal. - queryExecutor.submit(new AsyncKvStateQueryTask(ctx, request, kvState, stats)); - } else { - ByteBuf unknown = MessageSerializer.serializeKvStateRequestFailure( - ctx.alloc(), - request.getRequestId(), - new UnknownKvStateID(request.getKvStateId())); - - ctx.writeAndFlush(unknown); - - stats.reportFailedRequest(); - } + final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId()); + if (kvState == null) { + responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId())); } else { - // ------------------------------------------------------------ - // Unexpected - // ------------------------------------------------------------ - ByteBuf failure = MessageSerializer.serializeServerFailure( - ctx.alloc(), - new IllegalArgumentException("Unexpected message type " + msgType - + ". KvStateServerHandler expects " - + MessageType.REQUEST + " messages.")); - - ctx.writeAndFlush(failure); - } - } catch (Throwable t) { - String stringifiedCause = ExceptionUtils.stringifyException(t); - - ByteBuf err; - if (request != null) { - String errMsg = "Failed to handle incoming request with ID " + - request.getRequestId() + ". Caused by: " + stringifiedCause; - err = MessageSerializer.serializeKvStateRequestFailure( - ctx.alloc(), - request.getRequestId(), - new RuntimeException(errMsg)); - - stats.reportFailedRequest(); - } else { - String errMsg = "Failed to handle incoming message. Caused by: " + stringifiedCause; - err = MessageSerializer.serializeServerFailure( - ctx.alloc(), - new RuntimeException(errMsg)); - } - - ctx.writeAndFlush(err); - } finally { - // IMPORTANT: We have to always recycle the incoming buffer. - // Otherwise we will leak memory out of Netty's buffer pool. - // - // If any operation ever holds on to the buffer, it is the - // responsibility of that operation to retain the buffer and - // release it later. - ReferenceCountUtil.release(msg); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - String stringifiedCause = ExceptionUtils.stringifyException(cause); - String msg = "Exception in server pipeline. Caused by: " + stringifiedCause; - - ByteBuf err = MessageSerializer.serializeServerFailure( - ctx.alloc(), - new RuntimeException(msg)); - - ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE); - } - - /** - * Task to execute the actual query against the {@link InternalKvState} instance. - */ - private static class AsyncKvStateQueryTask implements Runnable { - - private final ChannelHandlerContext ctx; - - private final KvStateRequest request; - - private final InternalKvState<?> kvState; - - private final KvStateRequestStats stats; - - private final long creationNanos; - - public AsyncKvStateQueryTask( - ChannelHandlerContext ctx, - KvStateRequest request, - InternalKvState<?> kvState, - KvStateRequestStats stats) { - - this.ctx = Objects.requireNonNull(ctx, "Channel handler context"); - this.request = Objects.requireNonNull(request, "State query"); - this.kvState = Objects.requireNonNull(kvState, "KvState"); - this.stats = Objects.requireNonNull(stats, "State query stats"); - this.creationNanos = System.nanoTime(); - } - - @Override - public void run() { - boolean success = false; - - try { - if (!ctx.channel().isActive()) { - return; - } - - // Query the KvState instance byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace(); - byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace); + byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace); if (serializedResult != null) { - // We found some data, success! - ByteBuf buf = MessageSerializer.serializeKvStateRequestResult( - ctx.alloc(), - request.getRequestId(), - serializedResult); - - int highWatermark = ctx.channel().config().getWriteBufferHighWaterMark(); - - ChannelFuture write; - if (buf.readableBytes() <= highWatermark) { - write = ctx.writeAndFlush(buf); - } else { - write = ctx.writeAndFlush(new ChunkedByteBuf(buf, highWatermark)); - } - - write.addListener(new QueryResultWriteListener()); - - success = true; + responseFuture.complete(new KvStateResponse(serializedResult)); } else { - // No data for the key/namespace. This is considered to be - // a failure. - ByteBuf unknownKey = MessageSerializer.serializeKvStateRequestFailure( - ctx.alloc(), - request.getRequestId(), - new UnknownKeyOrNamespace()); - - ctx.writeAndFlush(unknownKey); - } - } catch (Throwable t) { - try { - String stringifiedCause = ExceptionUtils.stringifyException(t); - String errMsg = "Failed to query state backend for query " + - request.getRequestId() + ". Caused by: " + stringifiedCause; - - ByteBuf err = MessageSerializer.serializeKvStateRequestFailure( - ctx.alloc(), request.getRequestId(), new RuntimeException(errMsg)); - - ctx.writeAndFlush(err); - } catch (IOException e) { - LOG.error("Failed to respond with the error after failed to query state backend", e); - } - } finally { - if (!success) { - stats.reportFailedRequest(); + responseFuture.completeExceptionally(new UnknownKeyOrNamespaceException(getServerName())); } } + return responseFuture; + } catch (Throwable t) { + String errMsg = "Error while processing request with ID " + requestId + + ". Caused by: " + ExceptionUtils.stringifyException(t); + responseFuture.completeExceptionally(new RuntimeException(errMsg)); + return responseFuture; } + } - @Override - public String toString() { - return "AsyncKvStateQueryTask{" + - ", request=" + request + - ", creationNanos=" + creationNanos + - '}'; - } - - /** - * Callback after query result has been written. - * - * <p>Gathers stats and logs errors. - */ - private class QueryResultWriteListener implements ChannelFutureListener { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - long durationNanos = System.nanoTime() - creationNanos; - long durationMillis = TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS); - - if (future.isSuccess()) { - stats.reportSuccessfulRequest(durationMillis); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Query " + request + " failed after " + durationMillis + " ms", future.cause()); - } - - stats.reportFailedRequest(); - } - } - } + @Override + public void shutdown() { + // do nothing } } http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java index 4bf7e24..b4c548a 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java @@ -18,213 +18,93 @@ package org.apache.flink.queryablestate.server; -import org.apache.flink.queryablestate.messages.KvStateRequest; -import org.apache.flink.runtime.io.network.netty.NettyBufferPool; +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.messages.KvStateInternalRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; +import org.apache.flink.queryablestate.network.AbstractServerBase; +import org.apache.flink.queryablestate.network.AbstractServerHandler; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.KvStateServer; import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.query.netty.KvStateRequestStats; import org.apache.flink.util.Preconditions; -import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; -import org.apache.flink.shaded.netty4.io.netty.channel.Channel; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; -import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; /** - * Netty-based server answering {@link KvStateRequest} messages. - * - * <p>Requests are handled by asynchronous query tasks (see {@link KvStateServerHandler.AsyncKvStateQueryTask}) - * that are executed by a separate query Thread pool. This pool is shared among - * all TCP connections. - * - * <p>The incoming pipeline looks as follows: - * <pre> - * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler - * </pre> - * - * <p>Received binary messages are expected to contain a frame length field. Netty's - * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame before - * giving it to our {@link KvStateServerHandler}. - * - * <p>Connections are established and closed by the client. The server only - * closes the connection on a fatal failure that cannot be recovered. A - * server-side connection close is considered a failure by the client. + * The default implementation of the {@link KvStateServer}. */ -public class KvStateServerImpl implements KvStateServer { - - private static final Logger LOG = LoggerFactory.getLogger(KvStateServer.class); +@Internal +public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer { - /** Server config: low water mark. */ - private static final int LOW_WATER_MARK = 8 * 1024; + private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class); - /** Server config: high water mark. */ - private static final int HIGH_WATER_MARK = 32 * 1024; + /** The {@link KvStateRegistry} to query for state instances. */ + private final KvStateRegistry kvStateRegistry; - /** Netty's ServerBootstrap. */ - private final ServerBootstrap bootstrap; + private final KvStateRequestStats stats; - /** Query executor thread pool. */ - private final ExecutorService queryExecutor; - - /** Address of this server. */ - private KvStateServerAddress serverAddress; + private MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer; /** - * Creates the {@link KvStateServer}. + * Creates the state server. + * + * <p>The server is instantiated using reflection by the + * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress, int, int, int, KvStateRegistry, KvStateRequestStats) + * QueryableStateUtils.startKvStateServer(InetAddress, int, int, int, KvStateRegistry, KvStateRequestStats)}. * * <p>The server needs to be started via {@link #start()} in order to bind * to the configured bind address. * - * @param bindAddress Address to bind to - * @param bindPort Port to bind to. Pick random port if 0. - * @param numEventLoopThreads Number of event loop threads - * @param numQueryThreads Number of query threads - * @param kvStateRegistry KvStateRegistry to query for KvState instances - * @param stats Statistics tracker + * @param bindAddress the address to listen to. + * @param bindPort the port to listen to. + * @param numEventLoopThreads number of event loop threads. + * @param numQueryThreads number of query threads. + * @param kvStateRegistry {@link KvStateRegistry} to query for state instances. + * @param stats the statistics collector. */ public KvStateServerImpl( - InetAddress bindAddress, - Integer bindPort, - Integer numEventLoopThreads, - Integer numQueryThreads, - KvStateRegistry kvStateRegistry, - KvStateRequestStats stats) { - - Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, "Port " + bindPort + - " is out of valid port range (0-65536)."); - - Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads."); - Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads."); - - Preconditions.checkNotNull(kvStateRegistry, "KvStateRegistry"); - Preconditions.checkNotNull(stats, "KvStateRequestStats"); - - NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads); - - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Flink KvStateServer EventLoop Thread %d") - .build(); - - NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory); - - queryExecutor = createQueryExecutor(numQueryThreads); - - // Shared between all channels - KvStateServerHandler serverHandler = new KvStateServerHandler( - kvStateRegistry, - queryExecutor, - stats); - - bootstrap = new ServerBootstrap() - // Bind address and port - .localAddress(bindAddress, bindPort) - // NIO server channels - .group(nioGroup) - .channel(NioServerSocketChannel.class) - // Server channel Options - .option(ChannelOption.ALLOCATOR, bufferPool) - // Child channel options - .childOption(ChannelOption.ALLOCATOR, bufferPool) - .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK) - .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK) - // See initializer for pipeline details - .childHandler(new KvStateServerChannelInitializer(serverHandler)); + final InetAddress bindAddress, + final Integer bindPort, + final Integer numEventLoopThreads, + final Integer numQueryThreads, + final KvStateRegistry kvStateRegistry, + final KvStateRequestStats stats) { + + super("Queryable State Server", bindAddress, bindPort, numEventLoopThreads, numQueryThreads); + this.stats = Preconditions.checkNotNull(stats); + this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry); } @Override - public void start() throws InterruptedException { - Channel channel = bootstrap.bind().sync().channel(); - - InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress(); - serverAddress = new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort()); + public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> initializeHandler() { + this.serializer = new MessageSerializer<>( + new KvStateInternalRequest.KvStateInternalRequestDeserializer(), + new KvStateResponse.KvStateResponseDeserializer()); + return new KvStateServerHandler(this, kvStateRegistry, serializer, stats); } - @Override - public KvStateServerAddress getAddress() { - if (serverAddress == null) { - throw new IllegalStateException("KvStateServer not started yet."); - } - - return serverAddress; + public MessageSerializer<KvStateInternalRequest, KvStateResponse> getSerializer() { + Preconditions.checkState(serializer != null, "Server " + getServerName() + " has not been started."); + return serializer; } @Override - public void shutDown() { - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0, 10, TimeUnit.SECONDS); - } - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } - - serverAddress = null; + public void start() throws InterruptedException { + super.start(); } - /** - * Creates a thread pool for the query execution. - * - * @param numQueryThreads Number of query threads. - * @return Thread pool for query execution - */ - private static ExecutorService createQueryExecutor(int numQueryThreads) { - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Flink KvStateServer Query Thread %d") - .build(); - - return Executors.newFixedThreadPool(numQueryThreads, threadFactory); + @Override + public KvStateServerAddress getServerAddress() { + return super.getServerAddress(); } - /** - * Channel pipeline initializer. - * - * <p>The request handler is shared, whereas the other handlers are created - * per channel. - */ - private static final class KvStateServerChannelInitializer extends ChannelInitializer<SocketChannel> { - - /** The shared request handler. */ - private final KvStateServerHandler sharedRequestHandler; - - /** - * Creates the channel pipeline initializer with the shared request handler. - * - * @param sharedRequestHandler Shared request handler. - */ - public KvStateServerChannelInitializer(KvStateServerHandler sharedRequestHandler) { - this.sharedRequestHandler = Preconditions.checkNotNull(sharedRequestHandler, "Request handler"); - } - - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline() - .addLast(new ChunkedWriteHandler()) - .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) - .addLast(sharedRequestHandler); - } + @Override + public void shutdown() { + super.shutdown(); } - }
