This is an automated email from the ASF dual-hosted git repository. rgoers pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/flume.git
commit eee179a09df405c1ab55ae25a53b76ca1050bb97 Author: Ralph Goers <[email protected]> AuthorDate: Fri Sep 23 00:10:42 2022 -0700 FLUME-3437 - Improve JMSSource validation --- .../flume/source/jms/JMSMessageConsumer.java | 16 +---- .../flume/source/jms/JMSMessageConverter.java | 6 +- .../org/apache/flume/source/jms/JMSSource.java | 68 +++++++++++++--------- .../flume/source/jms/TestIntegrationActiveMQ.java | 1 + .../org/apache/flume/source/jms/TestJMSSource.java | 1 + 5 files changed, 46 insertions(+), 46 deletions(-) diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java index 5375bd030..84409be67 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java @@ -35,14 +35,11 @@ import javax.jms.Session; import javax.jms.Topic; import javax.naming.InitialContext; import javax.naming.NamingException; -import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; class JMSMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(JMSMessageConsumer.class); - private static final String JAVA_SCHEME = "java"; private final int batchSize; private final long pollTimeout; @@ -102,14 +99,7 @@ class JMSMessageConsumer { throw new IllegalStateException(String.valueOf(destinationType)); } } else { - try { - URI uri = new URI(destinationName); - String scheme = uri.getScheme(); - assertTrue(scheme == null || scheme.equals(JAVA_SCHEME), - "Unsupported JNDI URI: " + destinationName); - } catch (URISyntaxException ex) { - logger.warn("Invalid JNDI URI - {}", destinationName); - } + JMSSource.verifyContext(destinationName); destination = (Destination) initialContext.lookup(destinationName); } } catch (JMSException e) { @@ -220,8 +210,4 @@ class JMSMessageConsumer { logger.error("Could not destroy connection", e); } } - - private void assertTrue(boolean arg, String msg) { - Preconditions.checkArgument(arg, msg); - } } diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConverter.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConverter.java index 7d02809b4..31a15b874 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConverter.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConverter.java @@ -39,13 +39,13 @@ import org.apache.flume.annotations.InterfaceStability; @InterfaceStability.Stable public interface JMSMessageConverter { - public List<Event> convert(Message message) throws JMSException; + List<Event> convert(Message message) throws JMSException; /** * Implementors of JMSMessageConverter must either provide * a suitable builder or implement the Configurable interface. */ - public interface Builder { - public JMSMessageConverter build(Context context); + interface Builder { + JMSMessageConverter build(Context context); } } diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java index 216225871..43df14518 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java @@ -21,6 +21,8 @@ import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Properties; @@ -56,6 +58,7 @@ import com.google.common.io.Files; public class JMSSource extends AbstractPollableSource implements BatchSizeSupported { private static final Logger logger = LoggerFactory.getLogger(JMSSource.class); private static final String JAVA_SCHEME = "java"; + public static final String JNDI_ALLOWED_PROTOCOLS = "JndiAllowedProtocols"; // setup by constructor private final InitialContextFactory initialContextFactory; @@ -82,6 +85,7 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor private int jmsExceptionCounter; private InitialContext initialContext; + private static List<String> allowedSchemes = getAllowedProtocols(); public JMSSource() { this(new InitialContextFactory()); @@ -92,6 +96,34 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor this.initialContextFactory = initialContextFactory; } + private static List<String> getAllowedProtocols() { + String allowed = System.getProperty(JNDI_ALLOWED_PROTOCOLS, null); + if (allowed == null) { + return Collections.singletonList(JAVA_SCHEME); + } else { + String[] items = allowed.split(","); + List<String> schemes = new ArrayList<>(); + schemes.add(JAVA_SCHEME); + for (String item : items) { + if (!item.equals(JAVA_SCHEME)) { + schemes.add(item.trim()); + } + } + return schemes; + } + } + + public static void verifyContext(String location) { + try { + String scheme = new URI(location).getScheme(); + if (scheme != null && !allowedSchemes.contains(scheme)) { + throw new IllegalArgumentException("Invalid JNDI URI: " + location); + } + } catch (URISyntaxException ex) { + logger.trace("{}} is not a valid URI", location); + } + } + @Override protected void doConfigure(Context context) throws FlumeException { sourceCounter = new SourceCounter(getName()); @@ -100,14 +132,7 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY, "").trim(); providerUrl = context.getString(JMSSourceConfiguration.PROVIDER_URL, "").trim(); - try { - URI uri = new URI(providerUrl); - String scheme = uri.getScheme(); - assertTrue(scheme == null || scheme.equals(JAVA_SCHEME), - "Unsupported JNDI URI: " + providerUrl); - } catch (URISyntaxException ex) { - logger.warn("Invalid JNDI URI - {}", providerUrl); - } + verifyContext(providerUrl); destinationName = context.getString(JMSSourceConfiguration.DESTINATION_NAME, "").trim(); @@ -190,14 +215,7 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor String connectionFactoryName = context.getString( JMSSourceConfiguration.CONNECTION_FACTORY, JMSSourceConfiguration.CONNECTION_FACTORY_DEFAULT).trim(); - try { - URI uri = new URI(connectionFactoryName); - String scheme = uri.getScheme(); - assertTrue(scheme == null || scheme.equals(JAVA_SCHEME), - "Unsupported JNDI URI: " + connectionFactoryName); - } catch (URISyntaxException ex) { - logger.warn("Invalid JNDI URI - {}", connectionFactoryName); - } + verifyContext(connectionFactoryName); assertNotEmpty(initialContextFactoryName, String.format( "Initial Context Factory is empty. This is specified by %s", @@ -291,10 +309,6 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor Preconditions.checkArgument(!arg.isEmpty(), msg); } - private void assertTrue(boolean arg, String msg) { - Preconditions.checkArgument(arg, msg); - } - @Override protected synchronized Status doProcess() throws EventDeliveryException { boolean error = true; @@ -322,14 +336,12 @@ public class JMSSource extends AbstractPollableSource implements BatchSizeSuppor sourceCounter.incrementChannelWriteFail(); } catch (JMSException jmsException) { logger.warn("JMSException consuming events", jmsException); - if (++jmsExceptionCounter > errorThreshold) { - if (consumer != null) { - logger.warn("Exceeded JMSException threshold, closing consumer"); - sourceCounter.incrementEventReadFail(); - consumer.rollback(); - consumer.close(); - consumer = null; - } + if (++jmsExceptionCounter > errorThreshold && consumer != null) { + logger.warn("Exceeded JMSException threshold, closing consumer"); + sourceCounter.incrementEventReadFail(); + consumer.rollback(); + consumer.close(); + consumer = null; } } catch (Throwable throwable) { logger.error("Unexpected error processing events", throwable); diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java index e13502e2d..10320d749 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java @@ -90,6 +90,7 @@ public class TestIntegrationActiveMQ { private final String jmsPassword; public TestIntegrationActiveMQ(TestMode testMode) { + System.setProperty(JMSSource.JNDI_ALLOWED_PROTOCOLS, "tcp"); LOGGER.info("Testing with test mode {}", testMode); switch (testMode) { diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java index 6414428aa..e4c8cb442 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java @@ -64,6 +64,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { @SuppressWarnings("unchecked") @Override void afterSetup() throws Exception { + System.setProperty(JMSSource.JNDI_ALLOWED_PROTOCOLS, "dummy"); baseDir = Files.createTempDir(); passwordFile = new File(baseDir, "password"); Assert.assertTrue(passwordFile.createNewFile());
