[ 
https://issues.apache.org/jira/browse/CAMEL-8088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Browning updated CAMEL-8088:
--------------------------------
    Description: 
In our production system we have seen cases where the FTP thread is waiting for 
a response indefinitely despite having set _soTimeout_ on the connection. On 
investigation this is due to a condition that can occur where a socket is able 
to connect yet a firewall or the ilk then blocks further traffic.

This can be over come by setting the property _ftpClient.defaultTimeout_ to a 
non-zero value.

It should be the case where if upon initial socket connection no response 
occurs that the socket should be deemed dead, however this is not the case.

When the following exception is thrown during initial connect to an FTP server, 
after the socket has connected but whilst awaiting the inital reply it can 
leave the RemoteFileProducer in a state where it is connected but not logged in 
and no attempt reconnect is attempted, if the soTimeout as set by 
_ftpClient.defaultTimeout_ is set to zero then it can cause a subsequent 
command will wait for a reply indefinitely.

{noformat}
Caused by: java.io.IOException: Timed out waiting for initial connect reply
        at org.apache.commons.net.ftp.FTP._connectAction_(FTP.java:389) 
~[commons-net-3.1.jar:3.1]
        at 
org.apache.commons.net.ftp.FTPClient._connectAction_(FTPClient.java:796) 
~[commons-net-3.1.jar:3.1]
        at org.apache.commons.net.SocketClient.connect(SocketClient.java:172) 
~[commons-net-3.1.jar:3.1]
        at org.apache.commons.net.SocketClient.connect(SocketClient.java:192) 
~[commons-net-3.1.jar:3.1]
        at 
org.apache.camel.component.file.remote.FtpOperations.connect(FtpOperations.java:95)
 ~[camel-ftp-2.13.1.jar:2.13.1]
{noformat}

The RemoteFileProducer will enter this block as the loggedIn state has not yet 
been reached, however the existing broken socket is reused.

{code}
        // recover by re-creating operations which should most likely be able 
to recover
        if (!loggedIn) {
            log.debug("Trying to recover connection to: {} with a fresh 
client.", getEndpoint());
            setOperations(getEndpoint().createRemoteFileOperations());
            connectIfNecessary();
        }
{code}

Yet the _connectIfNecessary()_ method will return immediately since the check 
condition is based on socket connection and takes no account of whether login 
was achieved so the 'dead' socket is reused.

{code}
    protected void connectIfNecessary() throws 
GenericFileOperationFailedException {
        // This will be skipped when loggedIn = false and the socket is 
connected
        if (!getOperations().isConnected()) {
            log.debug("Not already connected/logged in. Connecting to: {}", 
getEndpoint());
            RemoteFileConfiguration config = getEndpoint().getConfiguration();
            loggedIn = getOperations().connect(config);
            if (!loggedIn) {
                return;
            }
            log.info("Connected and logged in to: " + getEndpoint());
        }
    }
{code}

A dirty test that blocks of this blocking condition:

{code}
package ftp;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.commons.net.ftp.FTPClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockftpserver.fake.FakeFtpServer;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.net.SocketFactory;

import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class FtpInitialConnectTimeoutTest extends CamelTestSupport {

  private static final int CONNECT_TIMEOUT = 11223;

  /**
   * Create the answer for the socket factory that causes a 
SocketTimeoutException to occur in connect.
   */
  private static class SocketAnswer implements Answer<Socket> {
    @Override
    public Socket answer(InvocationOnMock invocation) throws Throwable {
      final Socket socket = Mockito.spy(new Socket());
      final AtomicBoolean timeout = new AtomicBoolean();

      try {
        doAnswer(new Answer<InputStream>() {
          @Override
          public InputStream answer(InvocationOnMock invocation) throws 
Throwable {
            final InputStream stream = (InputStream) 
invocation.callRealMethod();

            InputStream inputStream = new InputStream() {
              @Override
              public int read() throws IOException {
                if (timeout.get()) {
                  // emulate a timeout occuring in _getReply()
                  throw new SocketTimeoutException();
                }
                return stream.read();
              }
            };

            return inputStream;
          }
        }).when(socket).getInputStream();
      } catch (IOException ignored) {
      }

      try {
        doAnswer(new Answer() {
          @Override
          public Object answer(InvocationOnMock invocation) throws Throwable {
            if ((Integer) invocation.getArguments()[0] == CONNECT_TIMEOUT) {
              // setting of connect timeout
              timeout.set(true);
            } else {
              // non-connect timeout
              timeout.set(false);
            }
            return invocation.callRealMethod();
          }
        }).when(socket).setSoTimeout(anyInt());
      } catch (SocketException e) {
        throw new RuntimeException(e);
      }
      return socket;
    }
  }

  private FakeFtpServer fakeFtpServer;

  @Override
  @Before
  public void setUp() throws Exception {
    fakeFtpServer = new FakeFtpServer();
    fakeFtpServer.setServerControlPort(0);
    fakeFtpServer.start();

    super.setUp();
  }

  @Override
  @After
  public void tearDown() throws Exception {
    super.tearDown();
    if (fakeFtpServer != null) {
      fakeFtpServer.stop();
    }
  }

  @Test
  public void testName() throws Exception {
    sendBody("direct:start", "test");
  }

  private FTPClient mockedClient() throws IOException {
    FTPClient client = new FTPClient();
    client.setSocketFactory(createSocketFactory());
    return client;
  }

  private SocketFactory createSocketFactory() throws IOException {
    SocketFactory socketFactory = mock(SocketFactory.class);
    when(socketFactory.createSocket()).thenAnswer(new SocketAnswer());
    return socketFactory;
  }

  @Override
  protected JndiRegistry createRegistry() throws Exception {
    JndiRegistry registry = super.createRegistry();
    registry.bind("mocked", mockedClient());
    return registry;
  }

  @Override
  protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
      @Override
      public void configure() throws Exception {
        from("direct:start")
            .to("ftp://localhost:"; + fakeFtpServer.getServerControlPort()
                + "?ftpClient=#mocked"
                + "&soTimeout=1234&"
                + "connectTimeout=" + CONNECT_TIMEOUT);
      }
    };
  }
}
{code}

  was:
In our production system we have seen cases where the FTP thread is waiting for 
a response indefinitely despite having set _soTimeout_ on the connection. On 
investigation this is due to a condition that can occur where a socket is able 
to connect yet a firewall or the ilk then blocks further traffic.

This can be over come by setting the property _ftpClient.defaultTimeout_ to a 
non-zero value.

It should be the case where if upon initial socket connection no response 
occurs that the socket should be deemed dead, however this is not the case.

When the following exception is thrown during initial connect to an FTP server, 
after the socket has connected but whilst awaiting the inital reply it can 
leave the RemoteFileProducer in a state where it is connected but not logged in 
and no attempt reconnect is attempted, if the soTimeout as set by 
_ftpClient.defaultTimeout_ is set to zero then it can cause a subsequent 
command will wait for a reply indefinitely.

{pre}
Caused by: java.io.IOException: Timed out waiting for initial connect reply
        at org.apache.commons.net.ftp.FTP._connectAction_(FTP.java:389) 
~[commons-net-3.1.jar:3.1]
        at 
org.apache.commons.net.ftp.FTPClient._connectAction_(FTPClient.java:796) 
~[commons-net-3.1.jar:3.1]
        at org.apache.commons.net.SocketClient.connect(SocketClient.java:172) 
~[commons-net-3.1.jar:3.1]
        at org.apache.commons.net.SocketClient.connect(SocketClient.java:192) 
~[commons-net-3.1.jar:3.1]
        at 
org.apache.camel.component.file.remote.FtpOperations.connect(FtpOperations.java:95)
 ~[camel-ftp-2.13.1.jar:2.13.1]
{pre}

The RemoteFileProducer will enter this block as the loggedIn state has not yet 
been reached, however the existing broken socket is reused.

{code}
        // recover by re-creating operations which should most likely be able 
to recover
        if (!loggedIn) {
            log.debug("Trying to recover connection to: {} with a fresh 
client.", getEndpoint());
            setOperations(getEndpoint().createRemoteFileOperations());
            connectIfNecessary();
        }
{code}

Yet the _connectIfNecessary()_ method will return immediately since the check 
condition is based on socket connection and takes no account of whether login 
was achieved so the 'dead' socket is reused.

{code}
    protected void connectIfNecessary() throws 
GenericFileOperationFailedException {
        // This will be skipped when loggedIn = false and the socket is 
connected
        if (!getOperations().isConnected()) {
            log.debug("Not already connected/logged in. Connecting to: {}", 
getEndpoint());
            RemoteFileConfiguration config = getEndpoint().getConfiguration();
            loggedIn = getOperations().connect(config);
            if (!loggedIn) {
                return;
            }
            log.info("Connected and logged in to: " + getEndpoint());
        }
    }
{code}

A dirty test that blocks of this blocking condition:

{code}
package ftp;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.commons.net.ftp.FTPClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockftpserver.fake.FakeFtpServer;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.net.SocketFactory;

import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class FtpInitialConnectTimeoutTest extends CamelTestSupport {

  private static final int CONNECT_TIMEOUT = 11223;

  /**
   * Create the answer for the socket factory that causes a 
SocketTimeoutException to occur in connect.
   */
  private static class SocketAnswer implements Answer<Socket> {
    @Override
    public Socket answer(InvocationOnMock invocation) throws Throwable {
      final Socket socket = Mockito.spy(new Socket());
      final AtomicBoolean timeout = new AtomicBoolean();

      try {
        doAnswer(new Answer<InputStream>() {
          @Override
          public InputStream answer(InvocationOnMock invocation) throws 
Throwable {
            final InputStream stream = (InputStream) 
invocation.callRealMethod();

            InputStream inputStream = new InputStream() {
              @Override
              public int read() throws IOException {
                if (timeout.get()) {
                  // emulate a timeout occuring in _getReply()
                  throw new SocketTimeoutException();
                }
                return stream.read();
              }
            };

            return inputStream;
          }
        }).when(socket).getInputStream();
      } catch (IOException ignored) {
      }

      try {
        doAnswer(new Answer() {
          @Override
          public Object answer(InvocationOnMock invocation) throws Throwable {
            if ((Integer) invocation.getArguments()[0] == CONNECT_TIMEOUT) {
              // setting of connect timeout
              timeout.set(true);
            } else {
              // non-connect timeout
              timeout.set(false);
            }
            return invocation.callRealMethod();
          }
        }).when(socket).setSoTimeout(anyInt());
      } catch (SocketException e) {
        throw new RuntimeException(e);
      }
      return socket;
    }
  }

  private FakeFtpServer fakeFtpServer;

  @Override
  @Before
  public void setUp() throws Exception {
    fakeFtpServer = new FakeFtpServer();
    fakeFtpServer.setServerControlPort(0);
    fakeFtpServer.start();

    super.setUp();
  }

  @Override
  @After
  public void tearDown() throws Exception {
    super.tearDown();
    if (fakeFtpServer != null) {
      fakeFtpServer.stop();
    }
  }

  @Test
  public void testName() throws Exception {
    sendBody("direct:start", "test");
  }

  private FTPClient mockedClient() throws IOException {
    FTPClient client = new FTPClient();
    client.setSocketFactory(createSocketFactory());
    return client;
  }

  private SocketFactory createSocketFactory() throws IOException {
    SocketFactory socketFactory = mock(SocketFactory.class);
    when(socketFactory.createSocket()).thenAnswer(new SocketAnswer());
    return socketFactory;
  }

  @Override
  protected JndiRegistry createRegistry() throws Exception {
    JndiRegistry registry = super.createRegistry();
    registry.bind("mocked", mockedClient());
    return registry;
  }

  @Override
  protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
      @Override
      public void configure() throws Exception {
        from("direct:start")
            .to("ftp://localhost:"; + fakeFtpServer.getServerControlPort()
                + "?ftpClient=#mocked"
                + "&soTimeout=1234&"
                + "connectTimeout=" + CONNECT_TIMEOUT);
      }
    };
  }
}
{code}


> FTP can wait indefinitely when connection timeout occurs during connect
> -----------------------------------------------------------------------
>
>                 Key: CAMEL-8088
>                 URL: https://issues.apache.org/jira/browse/CAMEL-8088
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-ftp
>    Affects Versions: 2.13.3
>            Reporter: Bob Browning
>            Priority: Minor
>
> In our production system we have seen cases where the FTP thread is waiting 
> for a response indefinitely despite having set _soTimeout_ on the connection. 
> On investigation this is due to a condition that can occur where a socket is 
> able to connect yet a firewall or the ilk then blocks further traffic.
> This can be over come by setting the property _ftpClient.defaultTimeout_ to a 
> non-zero value.
> It should be the case where if upon initial socket connection no response 
> occurs that the socket should be deemed dead, however this is not the case.
> When the following exception is thrown during initial connect to an FTP 
> server, after the socket has connected but whilst awaiting the inital reply 
> it can leave the RemoteFileProducer in a state where it is connected but not 
> logged in and no attempt reconnect is attempted, if the soTimeout as set by 
> _ftpClient.defaultTimeout_ is set to zero then it can cause a subsequent 
> command will wait for a reply indefinitely.
> {noformat}
> Caused by: java.io.IOException: Timed out waiting for initial connect reply
>       at org.apache.commons.net.ftp.FTP._connectAction_(FTP.java:389) 
> ~[commons-net-3.1.jar:3.1]
>       at 
> org.apache.commons.net.ftp.FTPClient._connectAction_(FTPClient.java:796) 
> ~[commons-net-3.1.jar:3.1]
>       at org.apache.commons.net.SocketClient.connect(SocketClient.java:172) 
> ~[commons-net-3.1.jar:3.1]
>       at org.apache.commons.net.SocketClient.connect(SocketClient.java:192) 
> ~[commons-net-3.1.jar:3.1]
>       at 
> org.apache.camel.component.file.remote.FtpOperations.connect(FtpOperations.java:95)
>  ~[camel-ftp-2.13.1.jar:2.13.1]
> {noformat}
> The RemoteFileProducer will enter this block as the loggedIn state has not 
> yet been reached, however the existing broken socket is reused.
> {code}
>         // recover by re-creating operations which should most likely be able 
> to recover
>         if (!loggedIn) {
>             log.debug("Trying to recover connection to: {} with a fresh 
> client.", getEndpoint());
>             setOperations(getEndpoint().createRemoteFileOperations());
>             connectIfNecessary();
>         }
> {code}
> Yet the _connectIfNecessary()_ method will return immediately since the check 
> condition is based on socket connection and takes no account of whether login 
> was achieved so the 'dead' socket is reused.
> {code}
>     protected void connectIfNecessary() throws 
> GenericFileOperationFailedException {
>         // This will be skipped when loggedIn = false and the socket is 
> connected
>         if (!getOperations().isConnected()) {
>             log.debug("Not already connected/logged in. Connecting to: {}", 
> getEndpoint());
>             RemoteFileConfiguration config = getEndpoint().getConfiguration();
>             loggedIn = getOperations().connect(config);
>             if (!loggedIn) {
>                 return;
>             }
>             log.info("Connected and logged in to: " + getEndpoint());
>         }
>     }
> {code}
> A dirty test that blocks of this blocking condition:
> {code}
> package ftp;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.JndiRegistry;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.apache.commons.net.ftp.FTPClient;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
> import org.mockftpserver.fake.FakeFtpServer;
> import org.mockito.Mockito;
> import org.mockito.invocation.InvocationOnMock;
> import org.mockito.stubbing.Answer;
> import java.io.IOException;
> import java.io.InputStream;
> import java.net.Socket;
> import java.net.SocketException;
> import java.net.SocketTimeoutException;
> import java.util.concurrent.atomic.AtomicBoolean;
> import javax.net.SocketFactory;
> import static org.mockito.Matchers.anyInt;
> import static org.mockito.Mockito.doAnswer;
> import static org.mockito.Mockito.mock;
> import static org.mockito.Mockito.when;
> public class FtpInitialConnectTimeoutTest extends CamelTestSupport {
>   private static final int CONNECT_TIMEOUT = 11223;
>   /**
>    * Create the answer for the socket factory that causes a 
> SocketTimeoutException to occur in connect.
>    */
>   private static class SocketAnswer implements Answer<Socket> {
>     @Override
>     public Socket answer(InvocationOnMock invocation) throws Throwable {
>       final Socket socket = Mockito.spy(new Socket());
>       final AtomicBoolean timeout = new AtomicBoolean();
>       try {
>         doAnswer(new Answer<InputStream>() {
>           @Override
>           public InputStream answer(InvocationOnMock invocation) throws 
> Throwable {
>             final InputStream stream = (InputStream) 
> invocation.callRealMethod();
>             InputStream inputStream = new InputStream() {
>               @Override
>               public int read() throws IOException {
>                 if (timeout.get()) {
>                   // emulate a timeout occuring in _getReply()
>                   throw new SocketTimeoutException();
>                 }
>                 return stream.read();
>               }
>             };
>             return inputStream;
>           }
>         }).when(socket).getInputStream();
>       } catch (IOException ignored) {
>       }
>       try {
>         doAnswer(new Answer() {
>           @Override
>           public Object answer(InvocationOnMock invocation) throws Throwable {
>             if ((Integer) invocation.getArguments()[0] == CONNECT_TIMEOUT) {
>               // setting of connect timeout
>               timeout.set(true);
>             } else {
>               // non-connect timeout
>               timeout.set(false);
>             }
>             return invocation.callRealMethod();
>           }
>         }).when(socket).setSoTimeout(anyInt());
>       } catch (SocketException e) {
>         throw new RuntimeException(e);
>       }
>       return socket;
>     }
>   }
>   private FakeFtpServer fakeFtpServer;
>   @Override
>   @Before
>   public void setUp() throws Exception {
>     fakeFtpServer = new FakeFtpServer();
>     fakeFtpServer.setServerControlPort(0);
>     fakeFtpServer.start();
>     super.setUp();
>   }
>   @Override
>   @After
>   public void tearDown() throws Exception {
>     super.tearDown();
>     if (fakeFtpServer != null) {
>       fakeFtpServer.stop();
>     }
>   }
>   @Test
>   public void testName() throws Exception {
>     sendBody("direct:start", "test");
>   }
>   private FTPClient mockedClient() throws IOException {
>     FTPClient client = new FTPClient();
>     client.setSocketFactory(createSocketFactory());
>     return client;
>   }
>   private SocketFactory createSocketFactory() throws IOException {
>     SocketFactory socketFactory = mock(SocketFactory.class);
>     when(socketFactory.createSocket()).thenAnswer(new SocketAnswer());
>     return socketFactory;
>   }
>   @Override
>   protected JndiRegistry createRegistry() throws Exception {
>     JndiRegistry registry = super.createRegistry();
>     registry.bind("mocked", mockedClient());
>     return registry;
>   }
>   @Override
>   protected RouteBuilder createRouteBuilder() throws Exception {
>     return new RouteBuilder() {
>       @Override
>       public void configure() throws Exception {
>         from("direct:start")
>             .to("ftp://localhost:"; + fakeFtpServer.getServerControlPort()
>                 + "?ftpClient=#mocked"
>                 + "&soTimeout=1234&"
>                 + "connectTimeout=" + CONNECT_TIMEOUT);
>       }
>     };
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to