[ 
https://issues.apache.org/jira/browse/TRAFODION-3334?focusedWorklogId=386254&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-386254
 ]

ASF GitHub Bot logged work on TRAFODION-3334:
---------------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Feb/20 21:44
            Start Date: 12/Feb/20 21:44
    Worklog Time Spent: 10m 
      Work Description: narendragoyal commented on pull request #1869: 
[TRAFODION-3334] Refactored and re-implemented monitor communication.
URL: https://github.com/apache/trafodion/pull/1869#discussion_r377995049
 
 

 ##########
 File path: core/sqf/monitor/linux/comm.cxx
 ##########
 @@ -0,0 +1,1757 @@
+///////////////////////////////////////////////////////////////////////////////
+//
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+//
+///////////////////////////////////////////////////////////////////////////////
+
+#include <iostream>
+
+using namespace std;
+
+#include <errno.h>
+#include <limits.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <sys/epoll.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
+#include "monlogging.h"
+#include "montrace.h"
+#include "comm.h"
+
+const char *EpollEventString( __uint32_t events );
+const char *EpollOpString( int op );
+
+CComm::CComm( void )
+      :epollFd_(-1)
+{
+    const char method_name[] = "CComm::CComm";
+    TRACE_ENTRY;
+
+    // Add eyecatcher sequence as a debugging aid
+    memcpy(&eyecatcher_, "COMM", 4);
+
+    epollFd_ = epoll_create1( EPOLL_CLOEXEC );
+    if ( epollFd_ < 0 )
+    {
+        char ebuff[256];
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf), "[%s@%d] epoll_create1(sendrecv) error: 
%s\n",
+            method_name, __LINE__, strerror_r( errno, ebuff, 256 ) );
+        mon_log_write( COMM_COMM_1, SQ_LOG_CRIT, buf );
+
+        mon_failure_exit();
+    }
+
+    TRACE_EXIT;
+}
+
+CComm::~CComm( void )
+{
+    const char method_name[] = "CComm::~CComm";
+    TRACE_ENTRY;
+
+    if (epollFd_ != -1)
+    {
+        close( epollFd_ );
+    }
+
+    // Alter eyecatcher sequence as a debugging aid to identify deleted object
+    memcpy(&eyecatcher_, "comm", 4);
+
+    TRACE_EXIT;
+}
+
+int CComm::Accept( int listenSock )
+{
+    const char method_name[] = "CComm::Accept";
+    TRACE_ENTRY;
+
+#if defined(_XOPEN_SOURCE_EXTENDED)
+#ifdef __LP64__
+    socklen_t  size;    // size of socket address
+#else
+    size_t   size;      // size of socket address
+#endif
+#else
+    int    size;        // size of socket address
+#endif
+    int csock; // connected socket
+    struct sockaddr_in  sockinfo;   // socket address info
+
+    size = sizeof(struct sockaddr *);
+    if ( getsockname( listenSock, (struct sockaddr *) &sockinfo, &size ) )
+    {
+        char buf[MON_STRING_BUF_SIZE];
+        int err = errno;
+        snprintf(buf, sizeof(buf), "[%s], getsockname() failed, errno=%d 
(%s).\n",
+                 method_name, err, strerror(err));
+        mon_log_write(COMM_ACCEPT_1, SQ_LOG_ERR, buf);
+        return ( -1 );
+    }
+
+    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+    {
+        unsigned char *addrp = (unsigned char *) &sockinfo.sin_addr.s_addr;
+        trace_printf( "%s@%d - Accepting socket on addr=%d.%d.%d.%d,  
port=%d\n"
+                    , method_name, __LINE__
+                    , addrp[0]
+                    , addrp[1]
+                    , addrp[2]
+                    , addrp[3]
+                    , (int) ntohs( sockinfo.sin_port ) );
+    }
+
+    while ( ((csock = accept( listenSock
+                            , (struct sockaddr *) 0
+                            , (socklen_t *) 0 ) ) < 0) && (errno == EINTR) );
+
+    if ( csock > 0 )
+    {
+        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+        {
+            unsigned char *addrp = (unsigned char *) &sockinfo.sin_addr.s_addr;
+            trace_printf( "%s@%d - Accepted socket on addr=%d.%d.%d.%d, "
+                          "port=%d, listenSock=%d, csock=%d\n"
+                        , method_name, __LINE__
+                        , addrp[0]
+                        , addrp[1]
+                        , addrp[2]
+                        , addrp[3]
+                        , (int) ntohs( sockinfo.sin_port )
+                        , listenSock
+                        , csock );
+        }
+
+        int nodelay = 1;
+        if ( setsockopt( csock
+                       , IPPROTO_TCP
+                       , TCP_NODELAY
+                       , (char *) &nodelay
+                       , sizeof(int) ) )
+        {
+            char buf[MON_STRING_BUF_SIZE];
+            int err = errno;
+            snprintf(buf, sizeof(buf), "[%s], setsockopt() failed, errno=%d 
(%s).\n",
+                     method_name, err, strerror(err));
+            mon_log_write(COMM_ACCEPT_2, SQ_LOG_ERR, buf);
+            return ( -2 );
+        }
+
+        int reuse = 1;
+        if ( setsockopt( csock
+                       , SOL_SOCKET
+                       , SO_REUSEADDR
+                       , (char *) &reuse
+                       , sizeof(int) ) )
+        {
+            char buf[MON_STRING_BUF_SIZE];
+            int err = errno;
+            snprintf(buf, sizeof(buf), "[%s], setsockopt() failed, errno=%d 
(%s).\n",
+                     method_name, err, strerror(err));
+            mon_log_write(COMM_ACCEPT_3, SQ_LOG_ERR, buf);
+            return ( -2 );
+        }
+    }
+
+    TRACE_EXIT;
+    return ( csock );
+}
+
+void CComm::ConnectLocal( int port )
+{
+    const char method_name[] = "CComm::ConnectLocal";
+    TRACE_ENTRY;
+
+    int  sock;     // socket
+    int  ret;      // returned value
+#if defined(_XOPEN_SOURCE_EXTENDED)
+#ifdef __LP64__
+    socklen_t  size;    // size of socket address
+#else
+    size_t   size;      // size of socket address
+#endif
+#else
+    int    size;        // size of socket address
+#endif
+    static int retries = 0;       // # times to retry connect
+    int     connect_failures = 0; // # failed connects
+    char   *p;     // getenv results
+    struct sockaddr_in  sockinfo; // socket address info
+    struct hostent *he;
+
+    size = sizeof(sockinfo);
+
+    if ( !retries )
+    {
+        p = getenv( "HPMP_CONNECT_RETRIES" );
+        if ( p ) retries = atoi( p );
+        else retries = 5;
+    }
+
+    sock = socket( AF_INET, SOCK_STREAM, 0 );
+    if ( sock < 0 )
+    {
+        char la_buf[MON_STRING_BUF_SIZE];
+        int err = errno;
+        sprintf( la_buf, "[%s], socket() failed! errno=%d (%s)\n"
+               , method_name, err, strerror( err ));
+        mon_log_write( COMM_CONNECTLOCAL_1, SQ_LOG_CRIT, la_buf );
+
+        mon_failure_exit();
+    }
+
+    he = gethostbyname( "localhost" );
+    if ( !he )
+    {
+        char ebuff[256];
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf), "[%s@%d] gethostbyname(%s) error: %s\n",
+            method_name, __LINE__, "localhost", strerror_r( h_errno, ebuff, 
256 ) );
+        mon_log_write( COMM_CONNECTLOCAL_2, SQ_LOG_CRIT, buf );
+
+        mon_failure_exit();
+    }
+
+    // Connect socket.
+    memset( (char *) &sockinfo, 0, size );
+    memcpy( (char *) &sockinfo.sin_addr, (char *) he->h_addr, 4 );
+    sockinfo.sin_family = AF_INET;
+    sockinfo.sin_port = htons( (unsigned short) port );
+
+    connect_failures = 0;
+    ret = 1;
+    while ( ret != 0 && connect_failures <= 10 )
+    {
+        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+        {
+            trace_printf( "%s@%d - Connecting to localhost addr=%d.%d.%d.%d, 
port=%d, connect_failures=%d\n"
+                        , method_name, __LINE__
+                        , (int)((unsigned char *)he->h_addr)[0]
+                        , (int)((unsigned char *)he->h_addr)[1]
+                        , (int)((unsigned char *)he->h_addr)[2]
+                        , (int)((unsigned char *)he->h_addr)[3]
+                        , port
+                        , connect_failures );
+        }
+
+        ret = connect( sock, (struct sockaddr *) &sockinfo, size );
+        if ( ret == 0 ) break;
+        if ( errno == EINTR )
+        {
+            ++connect_failures;
+        }
+        else
+        {
+            char la_buf[MON_STRING_BUF_SIZE];
+            int err = errno;
+            sprintf( la_buf, "[%s], connect() failed! errno=%d (%s)\n"
+                   , method_name, err, strerror( err ));
+            mon_log_write(COMM_CONNECTLOCAL_3, SQ_LOG_CRIT, la_buf);
+
+            mon_failure_exit();
+        }
+    }
+
+    close( sock );
+
+    TRACE_EXIT;
+}
+
+int CComm::Connect( const char *portName, bool doRetries )
+{
+    const char method_name[] = "CComm::Connect";
+    TRACE_ENTRY;
+
+    int  sock;      // socket
+    int  ret;       // returned value
+    int  nodelay = 1; // sockopt reuse option
 
 Review comment:
   typo in the comment - not a 'reuse' option :)
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 386254)
    Time Spent: 2h 50m  (was: 2h 40m)

> Communication IO between monitor processes must use timeouts and retries
> ------------------------------------------------------------------------
>
>                 Key: TRAFODION-3334
>                 URL: https://issues.apache.org/jira/browse/TRAFODION-3334
>             Project: Apache Trafodion
>          Issue Type: Bug
>          Components: foundation
>    Affects Versions: 2.4
>            Reporter: Gonzalo E Correa
>            Priority: Major
>             Fix For: 2.4
>
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Most communication channels used by monitor processes to exchange cluster 
> state information and to handle failure detection must be changed to 
> asynchronous IO with timeouts and retries to allow for the removal of a 
> monitor process from the cluster communication. This is to prevent a  'Sync 
> Thread Timeout' failure of the entire cluster instance where a monitor 
> process or it host server becomes unresponsive due to a server or network 
> failure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to