Added: aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/ManagedJPADataSourceSetup.java URL: http://svn.apache.org/viewvc/aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/ManagedJPADataSourceSetup.java?rev=1738630&view=auto ============================================================================== --- aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/ManagedJPADataSourceSetup.java (added) +++ aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/ManagedJPADataSourceSetup.java Mon Apr 11 19:36:03 2016 @@ -0,0 +1,222 @@ +package org.apache.aries.tx.control.jpa.xa.impl; + +import static java.util.Optional.ofNullable; +import static java.util.concurrent.TimeUnit.HOURS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.osgi.framework.Constants.OBJECTCLASS; +import static org.osgi.service.jdbc.DataSourceFactory.OSGI_JDBC_DRIVER_CLASS; +import static org.osgi.service.transaction.control.jdbc.JDBCConnectionProviderFactory.CONNECTION_LIFETIME; +import static org.osgi.service.transaction.control.jdbc.JDBCConnectionProviderFactory.CONNECTION_POOLING_ENABLED; +import static org.osgi.service.transaction.control.jdbc.JDBCConnectionProviderFactory.CONNECTION_TIMEOUT; +import static org.osgi.service.transaction.control.jdbc.JDBCConnectionProviderFactory.IDLE_TIMEOUT; +import static org.osgi.service.transaction.control.jdbc.JDBCConnectionProviderFactory.MAX_CONNECTIONS; +import static org.osgi.service.transaction.control.jdbc.JDBCConnectionProviderFactory.MIN_CONNECTIONS; +import static org.osgi.service.transaction.control.jdbc.JDBCConnectionProviderFactory.USE_DRIVER; + +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import javax.sql.DataSource; + +import org.apache.aries.tx.control.jdbc.xa.connection.impl.XADataSourceMapper; +import org.osgi.framework.BundleContext; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceReference; +import org.osgi.service.cm.ConfigurationException; +import org.osgi.service.jdbc.DataSourceFactory; +import org.osgi.service.transaction.control.TransactionException; +import org.osgi.util.tracker.ServiceTracker; +import org.osgi.util.tracker.ServiceTrackerCustomizer; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; + +public class ManagedJPADataSourceSetup implements LifecycleAware, + ServiceTrackerCustomizer<DataSourceFactory, ManagedJPAEMFLocator> { + + private final BundleContext context; + private final String pid; + private final Properties jdbcProperties; + private final Map<String, Object> baseJPAProperties; + private final Map<String, Object> providerProperties; + + private final ServiceTracker<DataSourceFactory, ManagedJPAEMFLocator> dsfTracker; + private final AtomicReference<ServiceReference<DataSourceFactory>> activeDsf = new AtomicReference<>(); + + public ManagedJPADataSourceSetup(BundleContext context, String pid, Properties jdbcProperties, + Map<String, Object> baseJPAProperties, Map<String, Object> providerProperties) throws InvalidSyntaxException, ConfigurationException { + this.context = context; + this.pid = pid; + this.jdbcProperties = jdbcProperties; + this.baseJPAProperties = baseJPAProperties; + this.providerProperties = providerProperties; + + String targetFilter = (String) providerProperties.get(ManagedServiceFactoryImpl.DSF_TARGET_FILTER); + if (targetFilter == null) { + String driver = (String) providerProperties.get(OSGI_JDBC_DRIVER_CLASS); + if (driver == null) { + ManagedServiceFactoryImpl.LOG.error("The configuration {} must specify a target filter or a JDBC driver class", pid); + throw new ConfigurationException(OSGI_JDBC_DRIVER_CLASS, + "The configuration must specify either a target filter or a JDBC driver class"); + } + targetFilter = "(" + OSGI_JDBC_DRIVER_CLASS + "=" + driver + ")"; + } + + targetFilter = "(&(" + OBJECTCLASS + "=" + DataSourceFactory.class.getName() + ")" + targetFilter + ")"; + + this.dsfTracker = new ServiceTracker<>(context, context.createFilter(targetFilter), this); + } + + public void start() { + dsfTracker.open(); + } + + public void stop() { + dsfTracker.close(); + } + + @Override + public ManagedJPAEMFLocator addingService(ServiceReference<DataSourceFactory> reference) { + DataSourceFactory service = context.getService(reference); + ManagedJPAEMFLocator toReturn; + try { + toReturn = new ManagedJPAEMFLocator(context, pid, + getJPAProperties(service), providerProperties); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + return null; + } + updateService(reference, toReturn); + + return toReturn; + } + + private void updateService(ServiceReference<DataSourceFactory> reference, ManagedJPAEMFLocator locator) { + boolean setDsf; + synchronized (this) { + setDsf = activeDsf.compareAndSet(null, reference); + } + try { + if (setDsf) { + locator.start(); + } + } catch (Exception e) { + ManagedServiceFactoryImpl.LOG.error("An error occurred when creating the connection provider for {}.", pid, e); + activeDsf.compareAndSet(reference, null); + throw new IllegalStateException("An error occurred when creating the connection provider", e); + } + } + + private Map<String, Object> getJPAProperties(DataSourceFactory dsf) { + Map<String, Object> props = new HashMap<>(baseJPAProperties); + + DataSource unpooled; + try { + if (toBoolean(providerProperties, USE_DRIVER, false)) { + throw new TransactionException("The Database must use an XA connection"); + } else { + unpooled = new XADataSourceMapper(dsf.createXADataSource(jdbcProperties)); + } + } catch (SQLException sqle) { + throw new TransactionException("Unable to create the JDBC resource provider", sqle); + } + + DataSource toUse = poolIfNecessary(providerProperties, unpooled); + + props.put("javax.persistence.jtaDataSource", toUse); + + return props; + } + + @Override + public void modifiedService(ServiceReference<DataSourceFactory> reference, ManagedJPAEMFLocator service) { + } + + @Override + public void removedService(ServiceReference<DataSourceFactory> reference, ManagedJPAEMFLocator service) { + service.stop(); + + if (activeDsf.compareAndSet(reference, null)) { + Map<ServiceReference<DataSourceFactory>,ManagedJPAEMFLocator> tracked = dsfTracker.getTracked(); + if (!tracked.isEmpty()) { + Entry<ServiceReference<DataSourceFactory>, ManagedJPAEMFLocator> e = tracked.entrySet().iterator().next(); + updateService(e.getKey(), e.getValue()); + } + } + } + + private DataSource poolIfNecessary(Map<String, Object> resourceProviderProperties, DataSource unpooled) { + DataSource toUse; + + if (toBoolean(resourceProviderProperties, CONNECTION_POOLING_ENABLED, true)) { + HikariConfig hcfg = new HikariConfig(); + hcfg.setDataSource(unpooled); + + // Sizes + hcfg.setMaximumPoolSize(toInt(resourceProviderProperties, MAX_CONNECTIONS, 10)); + hcfg.setMinimumIdle(toInt(resourceProviderProperties, MIN_CONNECTIONS, 10)); + + // Timeouts + hcfg.setConnectionTimeout(toLong(resourceProviderProperties, CONNECTION_TIMEOUT, SECONDS.toMillis(30))); + hcfg.setIdleTimeout(toLong(resourceProviderProperties, IDLE_TIMEOUT, TimeUnit.MINUTES.toMillis(3))); + hcfg.setMaxLifetime(toLong(resourceProviderProperties, CONNECTION_LIFETIME, HOURS.toMillis(3))); + + toUse = new HikariDataSource(hcfg); + + } else { + toUse = unpooled; + } + return toUse; + } + + private boolean toBoolean(Map<String, Object> props, String key, boolean defaultValue) { + Object o = ofNullable(props) + .map(m -> m.get(key)) + .orElse(defaultValue); + + if (o instanceof Boolean) { + return ((Boolean) o).booleanValue(); + } else if(o instanceof String) { + return Boolean.parseBoolean((String) o); + } else { + throw new IllegalArgumentException("The property " + key + " cannot be converted to a boolean"); + } + } + + private int toInt(Map<String, Object> props, String key, int defaultValue) { + + Object o = ofNullable(props) + .map(m -> m.get(key)) + .orElse(defaultValue); + + if (o instanceof Number) { + return ((Number) o).intValue(); + } else if(o instanceof String) { + return Integer.parseInt((String) o); + } else { + throw new IllegalArgumentException("The property " + key + " cannot be converted to an int"); + } + } + + private long toLong(Map<String, Object> props, String key, long defaultValue) { + + Object o = ofNullable(props) + .map(m -> m.get(key)) + .orElse(defaultValue); + + if (o instanceof Number) { + return ((Number) o).longValue(); + } else if(o instanceof String) { + return Long.parseLong((String) o); + } else { + throw new IllegalArgumentException("The property " + key + " cannot be converted to a long"); + } + } + +} \ No newline at end of file
Added: aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/ManagedJPAEMFLocator.java URL: http://svn.apache.org/viewvc/aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/ManagedJPAEMFLocator.java?rev=1738630&view=auto ============================================================================== --- aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/ManagedJPAEMFLocator.java (added) +++ aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/ManagedJPAEMFLocator.java Mon Apr 11 19:36:03 2016 @@ -0,0 +1,250 @@ +package org.apache.aries.tx.control.jpa.xa.impl; + +import static org.apache.aries.tx.control.jpa.xa.impl.ManagedServiceFactoryImpl.EMF_BUILDER_TARGET_FILTER; +import static org.osgi.framework.Constants.OBJECTCLASS; +import static org.osgi.service.jdbc.DataSourceFactory.JDBC_PASSWORD; +import static org.osgi.service.jpa.EntityManagerFactoryBuilder.JPA_UNIT_NAME; +import static org.osgi.service.jpa.EntityManagerFactoryBuilder.JPA_UNIT_PROVIDER; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Dictionary; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import javax.persistence.spi.PersistenceProvider; + +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceReference; +import org.osgi.framework.ServiceRegistration; +import org.osgi.framework.wiring.BundleWire; +import org.osgi.framework.wiring.BundleWiring; +import org.osgi.service.cm.ConfigurationException; +import org.osgi.service.jpa.EntityManagerFactoryBuilder; +import org.osgi.service.transaction.control.TransactionControl; +import org.osgi.service.transaction.control.jpa.JPAEntityManagerProvider; +import org.osgi.util.tracker.ServiceTracker; +import org.osgi.util.tracker.ServiceTrackerCustomizer; + +public class ManagedJPAEMFLocator implements LifecycleAware, + ServiceTrackerCustomizer<EntityManagerFactoryBuilder, EntityManagerFactoryBuilder> { + + private final BundleContext context; + private final String pid; + private final Map<String, Object> jpaProperties; + private final Map<String, Object> providerProperties; + private final ServiceTracker<EntityManagerFactoryBuilder, EntityManagerFactoryBuilder> emfBuilderTracker; + + private final AtomicReference<EntityManagerFactoryBuilder> activeDsf = new AtomicReference<>(); + private final AtomicReference<ServiceRegistration<JPAEntityManagerProvider>> serviceReg = new AtomicReference<>(); + + public ManagedJPAEMFLocator(BundleContext context, String pid, Map<String, Object> jpaProperties, + Map<String, Object> providerProperties) throws InvalidSyntaxException, ConfigurationException { + this.context = context; + this.pid = pid; + this.jpaProperties = jpaProperties; + this.providerProperties = providerProperties; + + String unitName = (String) providerProperties.get(JPA_UNIT_NAME); + if (unitName == null) { + ManagedServiceFactoryImpl.LOG.error("The configuration {} must specify a persistence unit name", pid); + throw new ConfigurationException(JPA_UNIT_NAME, + "The configuration must specify a persistence unit name"); + } + + String targetFilter = (String) providerProperties.get(EMF_BUILDER_TARGET_FILTER); + if (targetFilter == null) { + targetFilter = "(" + JPA_UNIT_NAME + "=" + unitName + ")"; + } + + targetFilter = "(&(" + OBJECTCLASS + "=" + EntityManagerFactoryBuilder.class.getName() + ")" + targetFilter + ")"; + + this.emfBuilderTracker = new ServiceTracker<>(context, context.createFilter(targetFilter), this); + } + + public void start() { + emfBuilderTracker.open(); + } + + public void stop() { + emfBuilderTracker.close(); + } + + @Override + public EntityManagerFactoryBuilder addingService(ServiceReference<EntityManagerFactoryBuilder> reference) { + EntityManagerFactoryBuilder service = context.getService(reference); + + updateService(reference, service); + return service; + } + + private void updateService(ServiceReference<EntityManagerFactoryBuilder> reference, EntityManagerFactoryBuilder service) { + boolean setEMFB; + synchronized (this) { + setEMFB = activeDsf.compareAndSet(null, service); + } + + if (setEMFB) { + try { + JPAEntityManagerProvider jpaEM = new DelayedJPAEntityManagerProvider(t -> { + + Map<String, Object> props = new HashMap<String, Object>(jpaProperties); + + setupTransactionManager(props, t, reference); + + return new JPAEntityManagerProviderFactoryImpl().getProviderFor(service, + props, providerProperties); + }); + ServiceRegistration<JPAEntityManagerProvider> reg = context + .registerService(JPAEntityManagerProvider.class, jpaEM, getServiceProperties()); + if (!serviceReg.compareAndSet(null, reg)) { + throw new IllegalStateException("Unable to set the JDBC connection provider registration"); + } + } catch (Exception e) { + ManagedServiceFactoryImpl.LOG.error("An error occurred when creating the connection provider for {}.", pid, e); + activeDsf.compareAndSet(service, null); + } + } + } + + private void setupTransactionManager(Map<String, Object> props, TransactionControl txControl, + ServiceReference<EntityManagerFactoryBuilder> reference) { + String provider = (String) reference.getProperty(JPA_UNIT_PROVIDER); + + ServiceReference<PersistenceProvider> providerRef = getPersistenceProvider(provider); + + if(providerRef == null) { + // TODO log a warning and give up + return; + } + + Bundle providerBundle = providerRef.getBundle(); + Bundle txControlProviderBundle = context.getBundle(); + + try { + if("org.hibernate.jpa.HibernatePersistenceProvider".equals(provider)) { + + try{ + providerBundle.loadClass("org.hibernate.resource.transaction.TransactionCoordinatorBuilder"); + } catch (Exception e) { + BundleWiring wiring = providerBundle.adapt(BundleWiring.class); + providerBundle = wiring.getRequiredWires("osgi.wiring.package").stream() + .filter(bw -> "org.hibernate".equals(bw.getCapability().getAttributes().get("osgi.wiring.package"))) + .map(BundleWire::getProviderWiring) + .map(BundleWiring::getBundle) + .findFirst().get(); + } + + ClassLoader pluginLoader = getPluginLoader(providerBundle, txControlProviderBundle); + + Class<?> pluginClazz = pluginLoader.loadClass("org.apache.aries.tx.control.jpa.xa.hibernate.impl.HibernateTxControlPlatform"); + Object plugin = pluginClazz.getConstructor(TransactionControl.class) + .newInstance(txControl); + + props.put("hibernate.transaction.coordinator_class", plugin); + + } else { + // TODO log a warning and give up + return; + } + } catch (Exception e) { + //TODO log a warning and give up + e.printStackTrace(); + } + } + + private ClassLoader getPluginLoader(Bundle providerBundle, Bundle txControlProviderBundle) { + return new ClassLoader() { + + @Override + public Class<?> loadClass(String name) throws ClassNotFoundException { + if(name.startsWith("org.apache.aries.tx.control.jpa.xa.hibernate")) { + String resource = name.replace('.', '/') + ".class"; + + try (InputStream is = txControlProviderBundle.getResource(resource).openStream()) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); + byte[] b = new byte[4096]; + int read; + while ((read = is.read(b)) != -1) { + baos.write(b, 0, read); + } + byte[] clazzBytes = baos.toByteArray(); + return defineClass(name, clazzBytes, 0, clazzBytes.length, + ManagedJPAEMFLocator.class.getProtectionDomain()); + } catch (IOException e) { + throw new ClassNotFoundException("Unable to load class " + name, e); + } + } + + if(name.startsWith("org.apache.aries.tx.control") || + name.startsWith("org.osgi.service.transaction.control")) { + return txControlProviderBundle.loadClass(name); + } + return providerBundle.loadClass(name); + } + }; + } + + private ServiceReference<PersistenceProvider> getPersistenceProvider(String provider) { + if(provider == null) { + return null; + } + try { + return context.getServiceReferences(PersistenceProvider.class, + "(javax.persistence.provider=" + provider + ")").stream() + .findFirst() + .orElse(null); + } catch (InvalidSyntaxException e) { + //TODO log a warning + return null; + } + } + + private Dictionary<String, ?> getServiceProperties() { + Hashtable<String, Object> props = new Hashtable<>(); + providerProperties.keySet().stream().filter(s -> !JDBC_PASSWORD.equals(s)) + .forEach(s -> props.put(s, providerProperties.get(s))); + return props; + } + + @Override + public void modifiedService(ServiceReference<EntityManagerFactoryBuilder> reference, EntityManagerFactoryBuilder service) { + } + + @Override + public void removedService(ServiceReference<EntityManagerFactoryBuilder> reference, EntityManagerFactoryBuilder service) { + boolean dsfLeft; + ServiceRegistration<JPAEntityManagerProvider> oldReg = null; + synchronized (this) { + dsfLeft = activeDsf.compareAndSet(service, null); + if (dsfLeft) { + oldReg = serviceReg.getAndSet(null); + } + } + + if (oldReg != null) { + try { + oldReg.unregister(); + } catch (IllegalStateException ise) { + ManagedServiceFactoryImpl.LOG.debug("An exception occurred when unregistering a service for {}", pid); + } + } + try { + context.ungetService(reference); + } catch (IllegalStateException ise) { + ManagedServiceFactoryImpl.LOG.debug("An exception occurred when ungetting the service for {}", reference); + } + + if (dsfLeft) { + EntityManagerFactoryBuilder newEMFBuilder = emfBuilderTracker.getService(); + if (newEMFBuilder != null) { + updateService(reference, newEMFBuilder); + } + } + } +} \ No newline at end of file Added: aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/ManagedServiceFactoryImpl.java URL: http://svn.apache.org/viewvc/aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/ManagedServiceFactoryImpl.java?rev=1738630&view=auto ============================================================================== --- aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/ManagedServiceFactoryImpl.java (added) +++ aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/ManagedServiceFactoryImpl.java Mon Apr 11 19:36:03 2016 @@ -0,0 +1,227 @@ +package org.apache.aries.tx.control.jpa.xa.impl; + +import static java.lang.Integer.MAX_VALUE; +import static java.util.Arrays.asList; +import static java.util.Optional.ofNullable; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; +import static javax.persistence.spi.PersistenceUnitTransactionType.JTA; +import static org.osgi.service.jdbc.DataSourceFactory.JDBC_DATABASE_NAME; +import static org.osgi.service.jdbc.DataSourceFactory.JDBC_DATASOURCE_NAME; +import static org.osgi.service.jdbc.DataSourceFactory.JDBC_DESCRIPTION; +import static org.osgi.service.jdbc.DataSourceFactory.JDBC_NETWORK_PROTOCOL; +import static org.osgi.service.jdbc.DataSourceFactory.JDBC_PASSWORD; +import static org.osgi.service.jdbc.DataSourceFactory.JDBC_PORT_NUMBER; +import static org.osgi.service.jdbc.DataSourceFactory.JDBC_ROLE_NAME; +import static org.osgi.service.jdbc.DataSourceFactory.JDBC_SERVER_NAME; +import static org.osgi.service.jdbc.DataSourceFactory.JDBC_URL; +import static org.osgi.service.jdbc.DataSourceFactory.JDBC_USER; +import static org.osgi.service.jdbc.DataSourceFactory.OSGI_JDBC_DRIVER_CLASS; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Dictionary; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import org.osgi.framework.BundleContext; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.service.cm.ConfigurationException; +import org.osgi.service.cm.ManagedServiceFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ManagedServiceFactoryImpl implements ManagedServiceFactory { + + static final Logger LOG = LoggerFactory.getLogger(ManagedServiceFactoryImpl.class); + + static final String DSF_TARGET_FILTER = "aries.dsf.target.filter"; + static final String EMF_BUILDER_TARGET_FILTER = "aries.emf.builder.target.filter"; + static final String JDBC_PROP_NAMES = "aries.jdbc.property.names"; + static final List<String> JDBC_PROPERTIES = asList(JDBC_DATABASE_NAME, JDBC_DATASOURCE_NAME, + JDBC_DESCRIPTION, JDBC_NETWORK_PROTOCOL, JDBC_PASSWORD, JDBC_PORT_NUMBER, JDBC_ROLE_NAME, JDBC_SERVER_NAME, + JDBC_URL, JDBC_USER); + static final String JPA_PROP_NAMES = "aries.jpa.property.names"; + + private final Map<String, LifecycleAware> managedInstances = new ConcurrentHashMap<>(); + + private final BundleContext context; + + public ManagedServiceFactoryImpl(BundleContext context) { + this.context = context; + } + + @Override + public String getName() { + return "Aries JPAEntityManagerProvider (Local only) service"; + } + + @Override + public void updated(String pid, Dictionary<String, ?> properties) throws ConfigurationException { + + Map<String, Object> propsMap = new HashMap<>(); + + Enumeration<String> keys = properties.keys(); + while (keys.hasMoreElements()) { + String key = keys.nextElement(); + propsMap.put(key, properties.get(key)); + } + + Properties jdbcProps = getJdbcProps(pid, propsMap); + Map<String, Object> jpaProps = getJPAProps(pid, propsMap); + + try { + LifecycleAware worker; + if(propsMap.containsKey(OSGI_JDBC_DRIVER_CLASS) || + propsMap.containsKey(DSF_TARGET_FILTER)) { + worker = new ManagedJPADataSourceSetup(context, pid, jdbcProps, jpaProps, propsMap); + } else { + if(!jdbcProps.isEmpty()) { + LOG.warn("The configuration {} contains raw JDBC configuration, but no osgi.jdbc.driver.class or aries.dsf.target.filter properties. No DataSourceFactory will be used byt this bundle, so the JPA provider must be able to directly create the datasource, and these configuration properties will likely be ignored. {}", + pid, jdbcProps.stringPropertyNames()); + } + worker = new ManagedJPAEMFLocator(context, pid, jpaProps, propsMap); + } + ofNullable(managedInstances.put(pid, worker)).ifPresent(LifecycleAware::stop); + worker.start(); + } catch (InvalidSyntaxException e) { + LOG.error("The configuration {} contained an invalid target filter {}", pid, e.getFilter()); + throw new ConfigurationException(DSF_TARGET_FILTER, "The target filter was invalid", e); + } + } + + public void stop() { + managedInstances.values().forEach(LifecycleAware::stop); + } + + @SuppressWarnings("unchecked") + private Properties getJdbcProps(String pid, Map<String, Object> properties) throws ConfigurationException { + + Object object = properties.getOrDefault(JDBC_PROP_NAMES, JDBC_PROPERTIES); + Collection<String> propnames; + if (object instanceof String) { + propnames = Arrays.asList(((String) object).split(",")); + } else if (object instanceof String[]) { + propnames = Arrays.asList((String[]) object); + } else if (object instanceof Collection) { + propnames = (Collection<String>) object; + } else { + LOG.error("The configuration {} contained an invalid list of JDBC property names", pid, object); + throw new ConfigurationException(JDBC_PROP_NAMES, + "The jdbc property names must be a String+ or comma-separated String"); + } + + Properties p = new Properties(); + + propnames.stream().filter(properties::containsKey) + .forEach(s -> p.setProperty(s, String.valueOf(properties.get(s)))); + + return p; + } + + @SuppressWarnings("unchecked") + private Map<String, Object> getJPAProps(String pid, Map<String, Object> properties) throws ConfigurationException { + + Object object = properties.getOrDefault(JPA_PROP_NAMES, new AllCollection()); + Collection<String> propnames; + if (object instanceof String) { + propnames = Arrays.asList(((String) object).split(",")); + } else if (object instanceof String[]) { + propnames = Arrays.asList((String[]) object); + } else if (object instanceof Collection) { + propnames = (Collection<String>) object; + } else { + LOG.error("The configuration {} contained an invalid list of JPA property names", pid, object); + throw new ConfigurationException(JDBC_PROP_NAMES, + "The jpa property names must be empty, a String+, or a comma-separated String list"); + } + + Map<String, Object> result = properties.keySet().stream() + .filter(propnames::contains) + .collect(toMap(identity(), properties::get)); + + result.putIfAbsent("javax.persistence.transactionType", JTA.name()); + + return result; + } + + @Override + public void deleted(String pid) { + ofNullable(managedInstances.remove(pid)) + .ifPresent(LifecycleAware::stop); + } + + private static class AllCollection implements Collection<String> { + + @Override + public int size() { + return MAX_VALUE; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public boolean contains(Object o) { + return true; + } + + @Override + public Iterator<String> iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public <T> T[] toArray(T[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(String e) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection<?> c) { + return true; + } + + @Override + public boolean addAll(Collection<? extends String> c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection<?> c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection<?> c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + } +} Copied: aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XAEnabledTxContextBindingConnection.java (from r1738126, aries/trunk/tx-control/tx-control-provider-jdbc-xa/src/main/java/org/apache/aries/tx/control/jdbc/xa/impl/XAEnabledTxContextBindingConnection.java) URL: http://svn.apache.org/viewvc/aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XAEnabledTxContextBindingConnection.java?p2=aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XAEnabledTxContextBindingConnection.java&p1=aries/trunk/tx-control/tx-control-provider-jdbc-xa/src/main/java/org/apache/aries/tx/control/jdbc/xa/impl/XAEnabledTxContextBindingConnection.java&r1=1738126&r2=1738630&rev=1738630&view=diff ============================================================================== --- aries/trunk/tx-control/tx-control-provider-jdbc-xa/src/main/java/org/apache/aries/tx/control/jdbc/xa/impl/XAEnabledTxContextBindingConnection.java (original) +++ aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XAEnabledTxContextBindingConnection.java Mon Apr 11 19:36:03 2016 @@ -1,4 +1,4 @@ -package org.apache.aries.tx.control.jdbc.xa.impl; +package org.apache.aries.tx.control.jpa.xa.impl; import static org.osgi.service.transaction.control.TransactionStatus.NO_TRANSACTION; @@ -10,9 +10,8 @@ import javax.sql.DataSource; import javax.transaction.xa.XAResource; import org.apache.aries.tx.control.jdbc.common.impl.ConnectionWrapper; -import org.apache.aries.tx.control.jdbc.common.impl.ScopedConnectionWrapper; import org.apache.aries.tx.control.jdbc.common.impl.TxConnectionWrapper; -import org.osgi.service.transaction.control.LocalResource; +import org.apache.aries.tx.control.jdbc.xa.connection.impl.XAConnectionWrapper; import org.osgi.service.transaction.control.TransactionContext; import org.osgi.service.transaction.control.TransactionControl; import org.osgi.service.transaction.control.TransactionException; @@ -22,16 +21,12 @@ public class XAEnabledTxContextBindingCo private final TransactionControl txControl; private final UUID resourceId; private final DataSource dataSource; - private final boolean xaEnabled; - private final boolean localEnabled; public XAEnabledTxContextBindingConnection(TransactionControl txControl, DataSource dataSource, UUID resourceId, boolean xaEnabled, boolean localEnabled) { this.txControl = txControl; this.dataSource = dataSource; this.resourceId = resourceId; - this.xaEnabled = xaEnabled; - this.localEnabled = localEnabled; } @Override @@ -55,19 +50,14 @@ public class XAEnabledTxContextBindingCo try { if (txContext.getTransactionStatus() == NO_TRANSACTION) { - toClose = dataSource.getConnection(); - toReturn = new ScopedConnectionWrapper(toClose); - } else if (txContext.supportsXA() && xaEnabled) { + throw new TransactionException("The JTA DataSource cannot be used outside a transaction"); + } else if (txContext.supportsXA()) { toClose = dataSource.getConnection(); toReturn = new TxConnectionWrapper(toClose); txContext.registerXAResource(getXAResource(toClose)); - } else if (txContext.supportsLocal() && localEnabled) { - toClose = dataSource.getConnection(); - toReturn = new TxConnectionWrapper(toClose); - txContext.registerLocalResource(getLocalResource(toClose)); } else { throw new TransactionException( - "There is a transaction active, but it does not support local participants"); + "There is a transaction active, but it does not support XA participants"); } } catch (Exception sqle) { throw new TransactionException( @@ -99,29 +89,4 @@ public class XAEnabledTxContextBindingCo throw new IllegalArgumentException("The XAResource for the connection cannot be found"); } } - - private LocalResource getLocalResource(Connection conn) { - return new LocalResource() { - @Override - public void commit() throws TransactionException { - try { - conn.commit(); - } catch (SQLException e) { - throw new TransactionException( - "An error occurred when committing the connection", e); - } - } - - @Override - public void rollback() throws TransactionException { - try { - conn.rollback(); - } catch (SQLException e) { - throw new TransactionException( - "An error occurred when rolling back the connection", e); - } - } - - }; - } } Added: aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManager.java URL: http://svn.apache.org/viewvc/aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManager.java?rev=1738630&view=auto ============================================================================== --- aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManager.java (added) +++ aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManager.java Mon Apr 11 19:36:03 2016 @@ -0,0 +1,82 @@ +package org.apache.aries.tx.control.jpa.xa.impl; + +import static org.osgi.service.transaction.control.TransactionStatus.NO_TRANSACTION; + +import java.util.UUID; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.PersistenceException; + +import org.apache.aries.tx.control.jpa.common.impl.EntityManagerWrapper; +import org.apache.aries.tx.control.jpa.common.impl.ScopedEntityManagerWrapper; +import org.apache.aries.tx.control.jpa.common.impl.TxEntityManagerWrapper; +import org.osgi.service.transaction.control.TransactionContext; +import org.osgi.service.transaction.control.TransactionControl; +import org.osgi.service.transaction.control.TransactionException; + +public class XATxContextBindingEntityManager extends EntityManagerWrapper { + + private final TransactionControl txControl; + private final UUID resourceId; + private final EntityManagerFactory emf; + + + public XATxContextBindingEntityManager(TransactionControl txControl, + EntityManagerFactory emf, UUID resourceId) { + this.txControl = txControl; + this.emf = emf; + this.resourceId = resourceId; + } + + @Override + protected final EntityManager getRealEntityManager() { + + TransactionContext txContext = txControl.getCurrentContext(); + + if (txContext == null) { + throw new TransactionException("The resource " + emf + + " cannot be accessed outside of an active Transaction Context"); + } + + EntityManager existing = (EntityManager) txContext.getScopedValue(resourceId); + + if (existing != null) { + return existing; + } + + EntityManager toReturn; + EntityManager toClose; + + try { + if (txContext.getTransactionStatus() == NO_TRANSACTION) { + toClose = emf.createEntityManager(); + toReturn = new ScopedEntityManagerWrapper(toClose); + } else if (txContext.supportsXA()) { + toClose = emf.createEntityManager(); + toReturn = new TxEntityManagerWrapper(toClose); + toClose.joinTransaction(); + } else { + throw new TransactionException( + "There is a transaction active, but it does not support local participants"); + } + } catch (Exception sqle) { + throw new TransactionException( + "There was a problem getting hold of a database connection", + sqle); + } + + + txContext.postCompletion(x -> { + try { + toClose.close(); + } catch (PersistenceException sqle) { + // TODO log this + } + }); + + txContext.putScopedValue(resourceId, toReturn); + + return toReturn; + } +} Added: aries/trunk/tx-control/tx-control-provider-jpa-xa/src/test/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManagerTest.java URL: http://svn.apache.org/viewvc/aries/trunk/tx-control/tx-control-provider-jpa-xa/src/test/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManagerTest.java?rev=1738630&view=auto ============================================================================== --- aries/trunk/tx-control/tx-control-provider-jpa-xa/src/test/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManagerTest.java (added) +++ aries/trunk/tx-control/tx-control-provider-jpa-xa/src/test/java/org/apache/aries/tx/control/jpa/xa/impl/XATxContextBindingEntityManagerTest.java Mon Apr 11 19:36:03 2016 @@ -0,0 +1,193 @@ +package org.apache.aries.tx.control.jpa.xa.impl; + + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.withSettings; +import static org.osgi.service.transaction.control.TransactionStatus.ACTIVE; +import static org.osgi.service.transaction.control.TransactionStatus.NO_TRANSACTION; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.sql.XAConnection; +import javax.transaction.xa.XAResource; + +import org.apache.aries.tx.control.jdbc.xa.connection.impl.XAConnectionWrapper; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.osgi.service.transaction.control.TransactionContext; +import org.osgi.service.transaction.control.TransactionControl; +import org.osgi.service.transaction.control.TransactionException; + +@RunWith(MockitoJUnitRunner.class) +public class XATxContextBindingEntityManagerTest { + + @Mock + TransactionControl control; + + @Mock + TransactionContext context; + + @Mock + EntityManagerFactory emf; + + @Mock + EntityManager rawEm; + + @Mock + XAResource xaResource; + + Map<Object, Object> variables = new HashMap<>(); + + UUID id = UUID.randomUUID(); + + XATxContextBindingEntityManager em; + + @Before + public void setUp() throws SQLException { + Mockito.when(emf.createEntityManager()).thenReturn(rawEm).thenReturn(null); + + Mockito.doAnswer(i -> variables.put(i.getArguments()[0], i.getArguments()[1])) + .when(context).putScopedValue(Mockito.any(), Mockito.any()); + Mockito.when(context.getScopedValue(Mockito.any())) + .thenAnswer(i -> variables.get(i.getArguments()[0])); + + em = new XATxContextBindingEntityManager(control, emf, id); + } + + private void setupNoTransaction() { + Mockito.when(control.getCurrentContext()).thenReturn(context); + Mockito.when(context.getTransactionStatus()).thenReturn(NO_TRANSACTION); + } + + private void setupActiveTransaction() { + Mockito.when(control.getCurrentContext()).thenReturn(context); + Mockito.when(context.supportsXA()).thenReturn(true); + Mockito.when(context.getTransactionStatus()).thenReturn(ACTIVE); + } + + + @Test(expected=TransactionException.class) + public void testUnscoped() throws SQLException { + em.isOpen(); + } + + @Test + public void testNoTransaction() throws SQLException { + setupNoTransaction(); + + em.isOpen(); + em.isOpen(); + + Mockito.verify(rawEm, times(2)).isOpen(); + Mockito.verify(rawEm, times(0)).getTransaction(); + Mockito.verify(context, times(0)).registerXAResource(Mockito.any()); + + Mockito.verify(context).postCompletion(Mockito.any()); + } + + @Test + public void testActiveTransactionStraightXAConnection() throws SQLException { + + Connection con = Mockito.mock(Connection.class, withSettings().extraInterfaces(XAConnection.class)); + Mockito.when(((XAConnection)con).getXAResource()).thenReturn(xaResource); + + Mockito.when(rawEm.unwrap(Connection.class)).thenReturn(con); + + setupActiveTransaction(); + + em.isOpen(); + em.isOpen(); + + Mockito.verify(rawEm, times(2)).isOpen(); + Mockito.verify(rawEm).joinTransaction(); + + Mockito.verify(context).postCompletion(Mockito.any()); + } + + @Test + public void testActiveTransactionWrappedXAConnection() throws SQLException { + + XAConnection con = Mockito.mock(XAConnection.class); + Connection raw = Mockito.mock(Connection.class); + Mockito.when(con.getXAResource()).thenReturn(xaResource); + Mockito.when(con.getConnection()).thenReturn(raw); + + XAConnectionWrapper value = new XAConnectionWrapper(con); + + Mockito.when(rawEm.unwrap(Connection.class)).thenReturn(value); + + setupActiveTransaction(); + + em.isOpen(); + em.isOpen(); + + Mockito.verify(rawEm, times(2)).isOpen(); + Mockito.verify(rawEm).joinTransaction(); + + Mockito.verify(context).postCompletion(Mockito.any()); + } + + @Test + public void testActiveTransactionUnwrappableXAConnection() throws SQLException { + + XAConnection xaCon = Mockito.mock(XAConnection.class); + Mockito.when(xaCon.getXAResource()).thenReturn(xaResource); + Connection con = Mockito.mock(Connection.class); + Mockito.when(con.unwrap(XAConnection.class)).thenReturn(xaCon); + Mockito.when(con.isWrapperFor(XAConnection.class)).thenReturn(true); + + Mockito.when(rawEm.unwrap(Connection.class)).thenReturn(con); + + setupActiveTransaction(); + + em.isOpen(); + em.isOpen(); + + Mockito.verify(rawEm, times(2)).isOpen(); + Mockito.verify(rawEm).joinTransaction(); + + Mockito.verify(context).postCompletion(Mockito.any()); + } + + @Test + public void testActiveTransactionUnwrappableXAConnectionWrapper() throws SQLException { + + XAConnection xaCon = Mockito.mock(XAConnection.class); + Mockito.when(xaCon.getXAResource()).thenReturn(xaResource); + Connection con = Mockito.mock(Connection.class); + XAConnectionWrapper toReturn = new XAConnectionWrapper(xaCon); + Mockito.when(con.unwrap(XAConnectionWrapper.class)).thenReturn(toReturn); + Mockito.when(con.isWrapperFor(XAConnectionWrapper.class)).thenReturn(true); + + Mockito.when(rawEm.unwrap(Connection.class)).thenReturn(con); + + setupActiveTransaction(); + + em.isOpen(); + em.isOpen(); + + Mockito.verify(rawEm, times(2)).isOpen(); + Mockito.verify(rawEm).joinTransaction(); + + Mockito.verify(context).postCompletion(Mockito.any()); + } + + @Test(expected=TransactionException.class) + public void testActiveTransactionNoXA() throws SQLException { + setupActiveTransaction(); + + Mockito.when(context.supportsXA()).thenReturn(false); + em.isOpen(); + } + +}
