[
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16116359#comment-16116359
]
ASF GitHub Bot commented on FLINK-7343:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/4470#discussion_r131608304
--- Diff:
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * Handler that is forwarding inbound traffic from the source channel to
the target channel on remoteHost:remotePort
+ * and the responses in the opposite direction. All of the network traffic
can be blocked at any time using blocked
+ * flag.
+ */
+class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
+ private static final String TARGET_CHANNEL_HANDLER_NAME =
"target_channel_handler";
+
+ // mapping between source and target channels, used for finding correct
target channel to use for given source.
+ private final Map<Channel, Channel> sourceToTargetChannels = new
ConcurrentHashMap<>();
+ private final Consumer<NetworkFailureHandler> onClose;
+ private final ClientSocketChannelFactory channelFactory;
+ private final String remoteHost;
+ private final int remotePort;
+
+ private final AtomicBoolean blocked;
+
+ public NetworkFailureHandler(
+ AtomicBoolean blocked,
+ Consumer<NetworkFailureHandler> onClose,
+ ClientSocketChannelFactory channelFactory,
+ String remoteHost,
+ int remotePort) {
+ this.blocked = blocked;
+ this.onClose = onClose;
+ this.channelFactory = channelFactory;
+ this.remoteHost = remoteHost;
+ this.remotePort = remotePort;
+ }
+
+ /**
+ * Closes the specified channel after all queued write requests are
flushed.
+ */
+ static void closeOnFlush(Channel channel) {
+ if (channel.isConnected()) {
+
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
+ public void closeConnections() {
+ for (Map.Entry<Channel, Channel> entry :
sourceToTargetChannels.entrySet()) {
+ // target channel is closed on source's channel
channelClosed even
+ entry.getKey().close();
+ }
+ }
+
+ @Override
+ public void channelOpen(ChannelHandlerContext context,
ChannelStateEvent event) throws Exception {
+ // Suspend incoming traffic until connected to the remote host.
+ final Channel sourceChannel = event.getChannel();
+ sourceChannel.setReadable(false);
+
+ if (blocked.get()) {
+ sourceChannel.close();
+ return;
+ }
+
+ // Start the connection attempt.
+ ClientBootstrap targetConnectionBootstrap = new
ClientBootstrap(channelFactory);
+
targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME,
new TargetChannelHandler(event.getChannel(), blocked));
+ ChannelFuture connectFuture =
targetConnectionBootstrap.connect(new InetSocketAddress(remoteHost,
remotePort));
+ sourceToTargetChannels.put(sourceChannel,
connectFuture.getChannel());
+
+ connectFuture.addListener(future -> {
+ if (future.isSuccess()) {
+ // Connection attempt succeeded:
+ // Begin to accept incoming traffic.
+ sourceChannel.setReadable(true);
+ } else {
+ // Close the connection if the connection
attempt has failed.
+ sourceChannel.close();
+ }
+ });
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext context, MessageEvent
event) throws Exception {
+ if (blocked.get()) {
+ return;
+ }
+
+ ChannelBuffer msg = (ChannelBuffer) event.getMessage();
+ Channel targetChannel =
sourceToTargetChannels.get(event.getChannel());
+ if (targetChannel == null) {
+ throw new IllegalStateException();
--- End diff --
A cause message could be added.
> Kafka010ProducerITCase instability
> ----------------------------------
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Reporter: Piotr Nowojski
> Assignee: Piotr Nowojski
> Labels: test-stability
>
> As reported by [~till.rohrmann] in
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test
> instability with
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing
> the data to files and potentially causing data loses on killing Kafka brokers
> in the tests.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)