Amraneze commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1481324978
@Abacn I pushed this PR to fix the issue and more integration test. We are
working on having a pool connection to determine the connection that we need.
Something like this.
```java
private static class JmsConnectionPool<T> implements Serializable {
private static final long serialVersionUID = 1L;
private static final int DEFAULT_MAX_POOL_SIZE = 10;
private static final int DEFAULT_INITIAL_POOL_SIZE = 20;
private JmsIO.Write<T> spec;
private final int maxPoolSize;
private final int initialPoolSize;
private List<JmsConnection<T>> jmsConnectionPool;
private List<JmsConnection<T>> usedJmsConnections = new ArrayList<>();
JmsConnectionPool(JmsIO.Write<T> spec, List<JmsConnection<T>>
jmsConnectionPool) {
this.spec = spec;
this.jmsConnectionPool = jmsConnectionPool;
this.maxPoolSize =
Optional.ofNullable(spec.getMaxPoolSize()).orElse(DEFAULT_MAX_POOL_SIZE);
this.initialPoolSize =
Optional.ofNullable(spec.getInitialPoolSize()).orElse(DEFAULT_INITIAL_POOL_SIZE);
}
static <T> JmsConnectionPool<T> create(JmsIO.Write<T> spec) {
int initialPoolSize =
Optional.ofNullable(spec.getInitialPoolSize()).orElse(DEFAULT_INITIAL_POOL_SIZE);
List<JmsConnection<T>> jmsConnectionPool = new
ArrayList<>(initialPoolSize);
for (int i = 0; i < initialPoolSize; i++) {
jmsConnectionPool.add(new JmsConnection<>(spec));
}
return new JmsConnectionPool<>(spec, jmsConnectionPool);
}
JmsConnection<T> getConnection() throws JmsIOException {
if (jmsConnectionPool.isEmpty()) {
if (usedJmsConnections.size() < maxPoolSize) {
jmsConnectionPool.add(new JmsConnection<>(spec));
} else {
throw new JmsIOException("Maximum pool connection size has been
reached");
}
}
JmsConnection<T> jmsConnection = jmsConnectionPool
.remove(jmsConnectionPool.size() - 1);
usedJmsConnections.add(jmsConnection);
return jmsConnection;
}
public boolean releaseConnection(JmsConnection<T> jmsConnection) {
jmsConnectionPool.add(jmsConnection);
return usedJmsConnections.remove(jmsConnection);
}
public boolean closeConnection(JmsConnection<T> jmsConnection) {
jmsConnection.close();
jmsConnectionPool.remove(jmsConnection);
return usedJmsConnections.remove(jmsConnection);
}
public void shutdown() throws JMSException {
usedJmsConnections.forEach(this::releaseConnection);
for (JmsConnection<T> jmsConnection : jmsConnectionPool) {
jmsConnection.close();
}
jmsConnectionPool.clear();
}
}
```
The function `closeConnection` will be called inside of
`this.connection.setExceptionListener`. What do you think about it ? We want
also to create latency or issues with other projects using JmsIO
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]