[ 
https://issues.apache.org/jira/browse/HIVE-16738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028171#comment-16028171
 ] 

anishek commented on HIVE-16738:
--------------------------------

mvn 
{code}
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.5</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.42</version>
        </dependency>
{code}

start mysql with *--innodb-lock-wait-timeout=10*

{code}
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.exception.ExceptionUtils;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class TwoConnections {

  static {
    try {
      Class.forName("com.mysql.jdbc.Driver");
    } catch (ClassNotFoundException e) {
      throw new RuntimeException(e);
    }
  }

  interface Function {
    void execute(Connection connection) throws SQLException;
  }

  private static void process(Function function) throws SQLException {
    try (Connection con = DriverManager.getConnection(
        "jdbc:mysql://localhost:3306/anishek", "root", "")) {
      con.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
      function.execute(con);
      return;
    } catch (Exception e) {
      System.out.println("on thread: " + Thread.currentThread().getName());
      e.printStackTrace();
    }
    throw new RuntimeException("Failure in Execution");
  }

  static abstract class BaseFunction implements Function {
    String threadName = Thread.currentThread().getName();

    void log(String message) {
      System.out.println(threadName + " :: " + message);
    }

    long read(Connection connection) throws SQLException {
      try (Statement statement = connection.createStatement()) {
        ResultSet resultSet =
            statement.executeQuery("select NEXT_EVENT_ID from 
NOTIFICATION_SEQUENCE;");
        if (resultSet.next()) {
          return resultSet.getLong(1);
        }
      }
      throw new RuntimeException(
          "could not read the value in " + Thread.currentThread().getName());
    }
  }

  static class UpdateNotificationFunction extends BaseFunction {

    private final CyclicBarrier barrier;

    UpdateNotificationFunction(CyclicBarrier barrier) {
      this.barrier = barrier;
    }

    @Override
    public void execute(Connection connection) throws SQLException {
      try {
        barrier.await(15L, TimeUnit.SECONDS);
      } catch (InterruptedException | BrokenBarrierException | TimeoutException 
e) {
        System.out.printf(
            "Barrier Exception :: " + Thread.currentThread().getName() + " :: " 
+ ExceptionUtils
                .getStackTrace(e));
      }
      connection.setAutoCommit(false);
      long current = read(connection);
      log("Read value: " + current);
      Statement statement = connection.createStatement();
      String value = String.valueOf(current + 1);
      statement.execute(
          "update NOTIFICATION_SEQUENCE set NEXT_EVENT_ID=" + value
              + " where NNI_ID=1");
      connection.commit();
      log("committed " + value + " :: " + new Date());
    }
  }

  public static void main(String[] args)
      throws ClassNotFoundException, SQLException, InterruptedException {

    int numberOfThreads = 20;
    CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> System.out
        .println("all threads reached the barrier , they will start executing 
now"));

    ExecutorService service = Executors.newFixedThreadPool(
        numberOfThreads,
        new ThreadFactoryBuilder().setNameFormat("thread-%d").build()
    );

    for (int i = 0; i < numberOfThreads; i++) {
      int localTemp = i;
      service.submit(() -> {
        try {
          process(new UpdateNotificationFunction(barrier));
        } catch (SQLException e) {
          System.out.println(String.valueOf(localTemp) + " :: failed with 
exception + " +
              ExceptionUtils.getStackTrace(e));
        }
      });
    }

    service.shutdown();
    service.awaitTermination(1, TimeUnit.MINUTES);
  }
}
{code}

> Notification ID generation in DBNotification might not be unique across HS2 
> instances.
> --------------------------------------------------------------------------------------
>
>                 Key: HIVE-16738
>                 URL: https://issues.apache.org/jira/browse/HIVE-16738
>             Project: Hive
>          Issue Type: Bug
>          Components: HiveServer2
>    Affects Versions: 3.0.0
>            Reporter: anishek
>            Assignee: anishek
>             Fix For: 3.0.0
>
>
> Going to explain the problem in scope of "replication" feature for hive 2 
> that is being built, as it is easier to explain:
> To allow replication to work we need to set 
> "hive.metastore.transactional.event.listeners"  to DBNotificationListener. 
> For use cases where there are multiple HiveServer2 Instances running 
> {code}
>  private void process(NotificationEvent event, ListenerEvent listenerEvent) 
> throws MetaException {
>     event.setMessageFormat(msgFactory.getMessageFormat());
>     synchronized (NOTIFICATION_TBL_LOCK) {
>       LOG.debug("DbNotificationListener: Processing : {}:{}", 
> event.getEventId(),
>           event.getMessage());
>       HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event);
>     }
>       // Set the DB_NOTIFICATION_EVENT_ID for future reference by other 
> listeners.
>       if (event.isSetEventId()) {
>         listenerEvent.putParameter(
>             MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
>             Long.toString(event.getEventId()));
>       }
>   }
> {code}
> the above code in DBNotificationListner having the object lock wont be 
> guarantee enough to make sure that all events get a unique id. The 
> transaction isolation level at the db "read-comitted" or "repeatable-read"  
> would  also not guarantee the same, unless a lock is at the db level 
> preferably on table {{NOTIFICATION_SEQUENCE}} which only has one row.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to