[
https://issues.apache.org/jira/browse/HIVE-16738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028171#comment-16028171
]
anishek commented on HIVE-16738:
--------------------------------
mvn
{code}
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.42</version>
</dependency>
{code}
start mysql with *--innodb-lock-wait-timeout=10*
{code}
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.exception.ExceptionUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class TwoConnections {
static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
interface Function {
void execute(Connection connection) throws SQLException;
}
private static void process(Function function) throws SQLException {
try (Connection con = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/anishek", "root", "")) {
con.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
function.execute(con);
return;
} catch (Exception e) {
System.out.println("on thread: " + Thread.currentThread().getName());
e.printStackTrace();
}
throw new RuntimeException("Failure in Execution");
}
static abstract class BaseFunction implements Function {
String threadName = Thread.currentThread().getName();
void log(String message) {
System.out.println(threadName + " :: " + message);
}
long read(Connection connection) throws SQLException {
try (Statement statement = connection.createStatement()) {
ResultSet resultSet =
statement.executeQuery("select NEXT_EVENT_ID from
NOTIFICATION_SEQUENCE;");
if (resultSet.next()) {
return resultSet.getLong(1);
}
}
throw new RuntimeException(
"could not read the value in " + Thread.currentThread().getName());
}
}
static class UpdateNotificationFunction extends BaseFunction {
private final CyclicBarrier barrier;
UpdateNotificationFunction(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void execute(Connection connection) throws SQLException {
try {
barrier.await(15L, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException
e) {
System.out.printf(
"Barrier Exception :: " + Thread.currentThread().getName() + " :: "
+ ExceptionUtils
.getStackTrace(e));
}
connection.setAutoCommit(false);
long current = read(connection);
log("Read value: " + current);
Statement statement = connection.createStatement();
String value = String.valueOf(current + 1);
statement.execute(
"update NOTIFICATION_SEQUENCE set NEXT_EVENT_ID=" + value
+ " where NNI_ID=1");
connection.commit();
log("committed " + value + " :: " + new Date());
}
}
public static void main(String[] args)
throws ClassNotFoundException, SQLException, InterruptedException {
int numberOfThreads = 20;
CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> System.out
.println("all threads reached the barrier , they will start executing
now"));
ExecutorService service = Executors.newFixedThreadPool(
numberOfThreads,
new ThreadFactoryBuilder().setNameFormat("thread-%d").build()
);
for (int i = 0; i < numberOfThreads; i++) {
int localTemp = i;
service.submit(() -> {
try {
process(new UpdateNotificationFunction(barrier));
} catch (SQLException e) {
System.out.println(String.valueOf(localTemp) + " :: failed with
exception + " +
ExceptionUtils.getStackTrace(e));
}
});
}
service.shutdown();
service.awaitTermination(1, TimeUnit.MINUTES);
}
}
{code}
> Notification ID generation in DBNotification might not be unique across HS2
> instances.
> --------------------------------------------------------------------------------------
>
> Key: HIVE-16738
> URL: https://issues.apache.org/jira/browse/HIVE-16738
> Project: Hive
> Issue Type: Bug
> Components: HiveServer2
> Affects Versions: 3.0.0
> Reporter: anishek
> Assignee: anishek
> Fix For: 3.0.0
>
>
> Going to explain the problem in scope of "replication" feature for hive 2
> that is being built, as it is easier to explain:
> To allow replication to work we need to set
> "hive.metastore.transactional.event.listeners" to DBNotificationListener.
> For use cases where there are multiple HiveServer2 Instances running
> {code}
> private void process(NotificationEvent event, ListenerEvent listenerEvent)
> throws MetaException {
> event.setMessageFormat(msgFactory.getMessageFormat());
> synchronized (NOTIFICATION_TBL_LOCK) {
> LOG.debug("DbNotificationListener: Processing : {}:{}",
> event.getEventId(),
> event.getMessage());
> HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event);
> }
> // Set the DB_NOTIFICATION_EVENT_ID for future reference by other
> listeners.
> if (event.isSetEventId()) {
> listenerEvent.putParameter(
> MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
> Long.toString(event.getEventId()));
> }
> }
> {code}
> the above code in DBNotificationListner having the object lock wont be
> guarantee enough to make sure that all events get a unique id. The
> transaction isolation level at the db "read-comitted" or "repeatable-read"
> would also not guarantee the same, unless a lock is at the db level
> preferably on table {{NOTIFICATION_SEQUENCE}} which only has one row.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)