http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-storm/pom.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-storm/pom.xml b/streams-runtimes/streams-runtime-storm/pom.xml deleted file mode 100644 index ded3efc..0000000 --- a/streams-runtimes/streams-runtime-storm/pom.xml +++ /dev/null @@ -1,124 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, - ~ software distributed under the License is distributed on an - ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ~ KIND, either express or implied. See the License for the - ~ specific language governing permissions and limitations - ~ under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <artifactId>streams-runtimes</artifactId> - <groupId>org.apache.streams</groupId> - <version>0.4-incubating-SNAPSHOT</version> - </parent> - <artifactId>streams-runtime-storm</artifactId> - <name>${project.artifactId}</name> - <description>Apache Streams Runtimes</description> - - <properties> - <storm.version>0.9.1-incubating</storm.version> - <scala.version>2.9.2</scala.version> - <zkclient.version>0.4</zkclient.version> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-config</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-core</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-util</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-collections4</artifactId> - <version>4.0</version> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-core</artifactId> - <version>${storm.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>jcl-over-slf4j</artifactId> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - <scope>compile</scope> - <type>jar</type> - </dependency> - <dependency> - <groupId>com.101tec</groupId> - <artifactId>zkclient</artifactId> - <version>${zkclient.version}</version> - <scope>compile</scope> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-testing</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - </dependencies> - - <build> - <sourceDirectory>src/main/java</sourceDirectory> - <testSourceDirectory>src/test/java</testSourceDirectory> - <resources> - <resource> - <directory>src/main/resources</directory> - </resource> - </resources> - <testResources> - <testResource> - <directory>src/test/resources</directory> - </testResource> - </testResources> - </build> -</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java ---------------------------------------------------------------------- diff --git a/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java b/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java index 8f22450..6344c3c 100644 --- a/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java +++ b/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java @@ -40,47 +40,50 @@ import java.util.Set; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; +/** + * Test validity of documents vs schemas. + */ public class SchemaValidationTest { - private final static Logger LOGGER = LoggerFactory.getLogger(SchemaValidationTest.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SchemaValidationTest.class); - private final static ObjectMapper MAPPER = new ObjectMapper(); + private static final ObjectMapper MAPPER = new ObjectMapper(); - /** - * Tests that activities matching core-ex* can be parsed by apache streams - * - * @throws Exception - */ - @Test - public void validateToSchema() throws Exception { + /** + * Tests that activities matching core-ex* can be parsed by apache streams. + * + * @throws Exception Test Exception + */ + @Test + public void testValidateToSchema() throws Exception { - JsonSchemaFactory factory = new JsonSchemaFactory(); + JsonSchemaFactory factory = new JsonSchemaFactory(); - InputStream testActivityFolderStream = SchemaValidationTest.class.getClassLoader() - .getResourceAsStream("activities"); - List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); + InputStream testActivityFolderStream = SchemaValidationTest.class.getClassLoader() + .getResourceAsStream("activities"); + List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); - for (String file : files) { - if( !file.startsWith(".") ) { + for (String file : files) { + if ( !file.startsWith(".") ) { - LOGGER.info("Test File: activities/" + file); - String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/activities/" + file))); - LOGGER.info("Test Document JSON: " + testFileString); - JsonNode testNode = MAPPER.readValue(testFileString, ObjectNode.class); - LOGGER.info("Test Document Object:" + testNode); - LOGGER.info("Test Schema File: " + "target/classes/verbs/" + file); - String testSchemaString = new String(Files.readAllBytes(Paths.get("target/classes/verbs/" + file))); - LOGGER.info("Test Schema JSON: " + testSchemaString); - JsonNode testSchemaNode = MAPPER.readValue(testFileString, ObjectNode.class); - LOGGER.info("Test Schema Object:" + testSchemaNode); - JsonSchema testSchema = factory.getSchema(testSchemaNode); - LOGGER.info("Test Schema:" + testSchema); + LOGGER.info("Test File: activities/" + file); + String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/activities/" + file))); + LOGGER.info("Test Document JSON: " + testFileString); + JsonNode testNode = MAPPER.readValue(testFileString, ObjectNode.class); + LOGGER.info("Test Document Object:" + testNode); + LOGGER.info("Test Schema File: " + "target/classes/verbs/" + file); + String testSchemaString = new String(Files.readAllBytes(Paths.get("target/classes/verbs/" + file))); + LOGGER.info("Test Schema JSON: " + testSchemaString); + JsonNode testSchemaNode = MAPPER.readValue(testFileString, ObjectNode.class); + LOGGER.info("Test Schema Object:" + testSchemaNode); + JsonSchema testSchema = factory.getSchema(testSchemaNode); + LOGGER.info("Test Schema:" + testSchema); - Set<ValidationMessage> errors = testSchema.validate(testNode); - assertThat(errors.size(), is(0)); + Set<ValidationMessage> errors = testSchema.validate(testNode); + assertThat(errors.size(), is(0)); - } - } + } } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java ---------------------------------------------------------------------- diff --git a/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java b/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java index b1b5824..8500efd 100644 --- a/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java +++ b/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java @@ -33,103 +33,106 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; +/** + * Tests that activities matching core-ex* can be parsed by apache streams. + */ public class ExamplesSerDeIT { - private final static Logger LOGGER = LoggerFactory.getLogger(ExamplesSerDeIT.class); - - private final static ObjectMapper MAPPER = new ObjectMapper(); - - /** - * Tests that activities matching core-ex* can be parsed by apache streams - * - * @throws Exception - */ - @Test - public void testCoreSerDe() throws Exception { - - InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader() - .getResourceAsStream("w3c/activitystreams-master/test"); - List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); - - for (String file : files) { - if( !file.startsWith(".") && file.contains("core-ex") ) { - LOGGER.info("File: activitystreams-master/test/" + file); - String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file))); - LOGGER.info("Content: " + testFileString); - ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class); - LOGGER.info("Object:" + testFileObjectNode); - } - } + private static final Logger LOGGER = LoggerFactory.getLogger(ExamplesSerDeIT.class); + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + /** + * Tests that activities matching core-ex* can be parsed by apache streams. + * + * @throws Exception test exception + */ + @Test + public void testCoreSerDe() throws Exception { + + InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader() + .getResourceAsStream("w3c/activitystreams-master/test"); + List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); + + for (String file : files) { + if ( !file.startsWith(".") && file.contains("core-ex") ) { + LOGGER.info("File: activitystreams-master/test/" + file); + String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file))); + LOGGER.info("Content: " + testFileString); + ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class); + LOGGER.info("Object:" + testFileObjectNode); + } } - - /** - * Tests that activities matching simple* can be parsed by apache streams - * - * @throws Exception - */ - @Test - public void testSimpleSerDe() throws Exception { - - InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader() - .getResourceAsStream("w3c/activitystreams-master/test"); - List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); - - for (String file : files) { - if( !file.startsWith(".") && file.contains("simple") ) { - LOGGER.info("File: activitystreams-master/test/" + file); - String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file))); - LOGGER.info("Content: " + testFileString); - ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class); - LOGGER.info("Object:" + testFileObjectNode); - } - } + } + + /** + * Tests that activities matching simple* can be parsed by apache streams. + * + * @throws Exception test exception + */ + @Test + public void testSimpleSerDe() throws Exception { + + InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader() + .getResourceAsStream("w3c/activitystreams-master/test"); + List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); + + for (String file : files) { + if ( !file.startsWith(".") && file.contains("simple") ) { + LOGGER.info("File: activitystreams-master/test/" + file); + String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file))); + LOGGER.info("Content: " + testFileString); + ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class); + LOGGER.info("Object:" + testFileObjectNode); + } } - - /** - * Tests that activities matching vocabulary-ex* can be parsed by apache streams - * - * @throws Exception - */ - @Ignore - @Test - public void testVocabularySerDe() throws Exception { - - InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader() - .getResourceAsStream("w3c/activitystreams-master/test"); - List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); - - for (String file : files) { - if( !file.startsWith(".") && file.contains("vocabulary-ex") ) { - LOGGER.info("File: activitystreams-master/test/" + file); - String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file))); - LOGGER.info("Content: " + testFileString); - ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class); - LOGGER.info("Object:" + testFileObjectNode); - } - } + } + + /** + * Tests that activities matching vocabulary-ex* can be parsed by apache streams. + * + * @throws Exception test exception + */ + @Ignore + @Test + public void testVocabularySerDe() throws Exception { + + InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader() + .getResourceAsStream("w3c/activitystreams-master/test"); + List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); + + for (String file : files) { + if ( !file.startsWith(".") && file.contains("vocabulary-ex") ) { + LOGGER.info("File: activitystreams-master/test/" + file); + String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file))); + LOGGER.info("Content: " + testFileString); + ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class); + LOGGER.info("Object:" + testFileObjectNode); + } } - - /** - * Tests that activities expect to fail cannot be parsed by apache streams - * - * @throws Exception - */ - @Ignore - @Test - public void testFailSerDe() throws Exception { - - InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader() - .getResourceAsStream("w3c/activitystreams-master/test/fail"); - List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); - - for (String file : files) { - if( !file.startsWith(".") && file.contains("vocabulary-ex") ) { - LOGGER.info("File: activitystreams-master/test/fail/" + file); - String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file))); - LOGGER.info("Content: " + testFileString); - ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class); - LOGGER.info("Object:" + testFileObjectNode); - } - } + } + + /** + * Tests that activities expect to fail cannot be parsed by apache streams. + * + * @throws Exception test exception + */ + @Ignore + @Test + public void testFailSerDe() throws Exception { + + InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader() + .getResourceAsStream("w3c/activitystreams-master/test/fail"); + List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); + + for (String file : files) { + if ( !file.startsWith(".") && file.contains("vocabulary-ex") ) { + LOGGER.info("File: activitystreams-master/test/fail/" + file); + String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file))); + LOGGER.info("Content: " + testFileString); + ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class); + LOGGER.info("Object:" + testFileObjectNode); + } } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java index 6037f28..514c851 100644 --- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java +++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java @@ -15,126 +15,116 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.util; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.management.*; import java.lang.management.ManagementFactory; import java.util.Queue; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import javax.management.InstanceAlreadyExistsException; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; /** * Common utilities for Streams components. */ public class ComponentUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(ComponentUtils.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ComponentUtils.class); - /** - * Certain types of queues will fail to {@link java.util.Queue#offer(Object)} an item due to many factors - * depending on the type of queue. <code>offerUntilSuccess</code> will not return until the item has been - * successfully queued onto the desired queue - * @param entry item to queue - * @param queue queue to add the entry to - * @param <T> - */ - public static <T> void offerUntilSuccess(T entry, Queue<T> queue) { - boolean success; - do { - success = queue.offer(entry); - Thread.yield(); - } - while( !success ); + /** + * Certain types of queues will fail to {@link java.util.Queue#offer(Object)} an item due to many factors + * depending on the type of queue. <code>offerUntilSuccess</code> will not return until the item has been + * successfully queued onto the desired queue + * @param entry item to queue + * @param queue queue to add the entry to + * @param <T> type + */ + public static <T> void offerUntilSuccess(T entry, Queue<T> queue) { + boolean success; + do { + success = queue.offer(entry); + Thread.yield(); } + while ( !success ); + } - /** - * Certain types of queues will return null when calling {@link java.util.Queue#poll()} due to many factors depending - * on the type of queue. <code>pollWhileNotEmpty</code> will poll the queue until an item from the queue is returned - * or the queue is empty. If the queue is empty it will return NULL. - * @param queue - * @param <T> - * @return - */ - public static <T> T pollWhileNotEmpty(Queue<T> queue) { - T item = queue.poll(); - while(!queue.isEmpty() && item == null) { - Thread.yield(); - item = queue.poll(); - } - return item; + /** + * Certain types of queues will return null when calling {@link java.util.Queue#poll()} due to many factors depending + * on the type of queue. <code>pollWhileNotEmpty</code> will poll the queue until an item from the queue is returned + * or the queue is empty. If the queue is empty it will return NULL. + * @param queue queue to read the entry from + * @param <T> type + * @return result + */ + public static <T> T pollWhileNotEmpty(Queue<T> queue) { + T item = queue.poll(); + while (!queue.isEmpty() && item == null) { + Thread.yield(); + item = queue.poll(); } + return item; + } - - public static String pollUntilStringNotEmpty(Queue queue) { - - String result = null; - do { - synchronized( ComponentUtils.class ) { - try { - result = (String) queue.remove(); - } catch( Exception e ) {} - } - Thread.yield(); + /** + * Attempts to safely {@link java.util.concurrent.ExecutorService#shutdown()} + * and {@link java.util.concurrent.ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)} + * of an {@link java.util.concurrent.ExecutorService}. + * @param stream service to be shutdown + * @param initialWait time in seconds to wait for currently running threads to finish execution + * @param secondaryWait time in seconds to wait for running threads that did not terminate to acknowledge their forced termination + */ + public static void shutdownExecutor(ExecutorService stream, int initialWait, int secondaryWait) { + stream.shutdown(); + try { + if (!stream.awaitTermination(initialWait, TimeUnit.SECONDS)) { + stream.shutdownNow(); + if (!stream.awaitTermination(secondaryWait, TimeUnit.SECONDS)) { + LOGGER.error("Executor Service did not terminate"); } - while( result == null && !StringUtils.isNotEmpty(result) ); - - return result; + } + } catch (InterruptedException ie) { + stream.shutdownNow(); + Thread.currentThread().interrupt(); } + } - /** - * Attempts to safely {@link java.util.concurrent.ExecutorService#shutdown()} and {@link java.util.concurrent.ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)} - * of an {@link java.util.concurrent.ExecutorService}. - * @param stream service to be shutdown - * @param initialWait time in seconds to wait for currently running threads to finish execution - * @param secondaryWait time in seconds to wait for running threads that did not terminate in the first wait to acknowledge their forced termination - */ - public static void shutdownExecutor(ExecutorService stream, int initialWait, int secondaryWait) { - stream.shutdown(); - try { - if (!stream.awaitTermination(initialWait, TimeUnit.SECONDS)) { - stream.shutdownNow(); - if (!stream.awaitTermination(secondaryWait, TimeUnit.SECONDS)) { - LOGGER.error("Executor Service did not terminate"); - } - } - } catch (InterruptedException ie) { - stream.shutdownNow(); - Thread.currentThread().interrupt(); - } + /** + * Removes all mbeans registered undered a specific domain. Made specificly to clean up at unit tests + * @param domain mbean domain + */ + public static void removeAllMBeansOfDomain(String domain) throws Exception { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + domain = domain.endsWith(":") ? domain : domain + ":"; + ObjectName objectName = new ObjectName(domain + "*"); + Set<ObjectName> mbeanNames = mbs.queryNames(objectName, null); + for (ObjectName name : mbeanNames) { + mbs.unregisterMBean(name); } + } - /** - * Removes all mbeans registered undered a specific domain. Made specificly to clean up at unit tests - * @param domain - */ - public static void removeAllMBeansOfDomain(String domain) throws Exception { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - domain = domain.endsWith(":") ? domain : domain+":"; - ObjectName objectName = new ObjectName(domain+"*"); - Set<ObjectName> mbeanNames = mbs.queryNames(objectName, null); - for(ObjectName name : mbeanNames) { - mbs.unregisterMBean(name); - } - } - - /** - * Attempts to register an object with local MBeanServer. Throws runtime exception on errors. - * @param name name to register bean with - * @param mbean mbean to register - */ - public static <V> void registerLocalMBean(String name, V mbean) { - try { - ObjectName objectName = new ObjectName(name); - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbs.registerMBean(mbean, objectName); - } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { - LOGGER.error("Failed to register MXBean : {}", e); - throw new RuntimeException(e); - } + /** + * Attempts to register an object with local MBeanServer. Throws runtime exception on errors. + * @param name name to register bean with + * @param mbean mbean to register + */ + public static <V> void registerLocalMBean(String name, V mbean) { + try { + ObjectName objectName = new ObjectName(name); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + mbs.registerMBean(mbean, objectName); + } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException ex) { + LOGGER.error("Failed to register MXBean : {}", ex); + throw new RuntimeException(ex); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/DateUtil.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/DateUtil.java b/streams-util/src/main/java/org/apache/streams/util/DateUtil.java deleted file mode 100644 index 7bbb8e9..0000000 --- a/streams-util/src/main/java/org/apache/streams/util/DateUtil.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.util; - -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; - -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; - - -/* - * - * If you can think of a better way, feel free to implement. This was a great class that I found that - * solves the majority of the issue I was dealing with. - * - * smashew 11=13=2012 - * - * Site: - * http://stackoverflow.com/questions/3389348/parse-any-date-in-java - */ - -public class DateUtil -{ - - private static final String REGEX_ONLY_NUMBERS = "[0-9]+"; - - private static final Map<String, String> DATE_FORMAT_REGEXPS = new HashMap<String, String>() - { - private static final long serialVersionUID = 1L; - { - put("^\\d{8}$", "yyyyMMdd"); - put("^\\d{1,2}-\\d{1,2}-\\d{4}$", "dd-MM-yyyy"); - put("^\\d{4}-\\d{1,2}-\\d{1,2}$", "yyyy-MM-dd"); - put("^\\d{1,2}/\\d{1,2}/\\d{4}$", "MM/dd/yyyy"); - put("^\\d{4}/\\d{1,2}/\\d{1,2}$", "yyyy/MM/dd"); - put("^\\d{1,2}\\s[a-z]{3}\\s\\d{4}$", "dd MMM yyyy"); - put("^\\d{1,2}\\s[a-z]{4,}\\s\\d{4}$", "dd MMMM yyyy"); - put("^\\d{12}$", "yyyyMMddHHmm"); - put("^\\d{8}\\s\\d{4}$", "yyyyMMdd HHmm"); - put("^\\d{1,2}-\\d{1,2}-\\d{4}\\s\\d{1,2}:\\d{2}$", "dd-MM-yyyy HH:mm"); - put("^\\d{4}-\\d{1,2}-\\d{1,2}\\s\\d{1,2}:\\d{2}$", "yyyy-MM-dd HH:mm"); - put("^\\d{1,2}/\\d{1,2}/\\d{4}\\s\\d{1,2}:\\d{2}$", "MM/dd/yyyy HH:mm"); - put("^\\d{4}/\\d{1,2}/\\d{1,2}\\s\\d{1,2}:\\d{2}$", "yyyy/MM/dd HH:mm"); - put("^\\d{1,2}\\s[a-z]{3}\\s\\d{4}\\s\\d{1,2}:\\d{2}$", "dd MMM yyyy HH:mm"); - put("^\\d{1,2}\\s[a-z]{4,}\\s\\d{4}\\s\\d{1,2}:\\d{2}$", "dd MMMM yyyy HH:mm"); - put("^\\d{14}$", "yyyyMMddHHmmss"); - put("^\\d{8}\\s\\d{6}$", "yyyyMMdd HHmmss"); - put("^\\d{1,2}-\\d{1,2}-\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "dd-MM-yyyy HH:mm:ss"); - put("^\\d{4}-\\d{1,2}-\\d{1,2}\\s\\d{1,2}:\\d{2}:\\d{2}$", "yyyy-MM-dd HH:mm:ss"); - put("^\\d{1,2}/\\d{1,2}/\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "MM/dd/yyyy HH:mm:ss"); - put("^\\d{4}/\\d{1,2}/\\d{1,2}\\s\\d{1,2}:\\d{2}:\\d{2}$", "yyyy/MM/dd HH:mm:ss"); - put("^\\d{1,2}\\s[a-z]{3}\\s\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "dd MMM yyyy HH:mm:ss"); - put("^\\d{1,2}\\s[a-z]{4,}\\s\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "dd MMMM yyyy HH:mm:ss"); - } - }; - - /** - * Determine SimpleDateFormat pattern matching with the given date string. Returns null if format is unknown. You - * can simply extend DateUtil with more formats if needed. - * - * @param dateString - * The date string to determine the SimpleDateFormat pattern for. - * @return The matching SimpleDateFormat pattern, or null if format is unknown. - * @see java.text.SimpleDateFormat - */ - public static String determineDateFormat(String dateString) - throws ParseException - { - for (String regexp : DATE_FORMAT_REGEXPS.keySet()) - if (dateString.toLowerCase().matches(regexp)) - return DATE_FORMAT_REGEXPS.get(regexp); - - throw new ParseException("unable to parse date",0); - } - - public static DateTime determineDate(String dateString) - throws ParseException - { - // Trim the string just in case it is dirty. - dateString = dateString.trim(); - - // check to see if it looks like it is millis. If so, parse as millis and return. - if(dateString.matches(REGEX_ONLY_NUMBERS)) - return new DateTime(new Date(Long.parseLong(dateString))); - - try - { - // try to parse the string into a java.date object, if possible. - SimpleDateFormat dateFormat = new SimpleDateFormat(determineDateFormat(dateString)); - dateFormat.setLenient(false); - return new DateTime(dateFormat.parse(dateString)); - } - catch(Exception e) - { - - } - - return new DateTime(DateTime.parse(dateString)); - } - - public static DateTime determineDateTime(String dateString) - throws ParseException - { - return new DateTime(determineDate(dateString)); - } - - public static DateTime determineDateTime(String dateString, DateTimeZone theTimeZone) - throws ParseException - { - DateTime beforeTimeZone = determineDateTime(dateString); - return new DateTime(beforeTimeZone.getYear(),beforeTimeZone.getMonthOfYear(), beforeTimeZone.getDayOfMonth(), beforeTimeZone.getHourOfDay(), beforeTimeZone.getMinuteOfHour(), beforeTimeZone.getSecondOfMinute(), beforeTimeZone.getMillisOfSecond(), theTimeZone); - } - - - public static String getAliasForDate(String date, String prefix) throws ParseException { - return getAliasesForDateRange(date, null, prefix).iterator().next(); - } - - public static String getAliasForDate(DateTime date, String prefix) throws ParseException { - return getAliasesForDateRange(date, null, prefix).iterator().next(); - } - - public static Set<String> getAliasesForDateRange(String starDate, String endDate, String prefix) - throws ParseException - { - DateTime start = null; - DateTime end = null; - DateTimeFormatter df = ISODateTimeFormat.dateTimeNoMillis(); - try { - start = df.parseDateTime(starDate); - } catch (Exception e) { - //do nothing. try to parse with other parsers - } - if(start == null) { - start = determineDateTime(starDate); - } - if(endDate != null) { - try { - end = df.parseDateTime(endDate); - } catch (Exception e) { - //do nothing. try to parse with other parsers - } - if( end == null) - end = determineDateTime(endDate); - } - return getAliasesForDateRange(start, end, prefix); - } - - public static Set<String> getAliasesForDateRange(DateTime startDate, DateTime endDate, String prefix) { - Set<String> aliases = new HashSet<String>(); - aliases.add(prefix+"_"+getDateAbbreviation(startDate.getYear(), startDate.getMonthOfYear())); - if(endDate == null) { - return aliases; - } - while(endDate.isAfter(startDate)) { - aliases.add(prefix+"_"+getDateAbbreviation(endDate.getYear(), endDate.getMonthOfYear())); - endDate = endDate.minusMonths(1); - } - return aliases; - } - - private static String getDateAbbreviation(int year, int month) { - if(month > 9) { - return Integer.toString(year)+Integer.toString(month); - } - else { - return Integer.toString(year)+"0"+Integer.toString(month); - } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java b/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java index 1972bc7..2d129de 100644 --- a/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java +++ b/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java @@ -29,20 +29,26 @@ import java.nio.charset.Charset; */ public class GuidUtils { - private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); - public static String generateGuid(String... parts) { + /** + * generateGuid from list of parts. + * @param parts list of parts + * @return guid + */ + public static String generateGuid(String... parts) { - StringBuilder seed = new StringBuilder(); - for( String part : parts ) { - Preconditions.checkNotNull(part); - Preconditions.checkArgument(!Strings.isNullOrEmpty(part)); - seed.append(part); - } + StringBuilder seed = new StringBuilder(); - String hash = Hashing.goodFastHash(24).hashString(seed, UTF8_CHARSET).asBytes().toString(); + for ( String part : parts ) { + Preconditions.checkNotNull(part); + Preconditions.checkArgument(!Strings.isNullOrEmpty(part)); + seed.append(part); + } - return hash; + String hash = Hashing.goodFastHash(24).hashString(seed, UTF8_CHARSET).asBytes().toString(); - } + return hash; + + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java b/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java index de324d2..ba22d3d 100644 --- a/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java +++ b/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java @@ -20,7 +20,11 @@ package org.apache.streams.util; import org.apache.commons.io.input.ClassLoaderObjectInputStream; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; /** * SerializationUtil contains methods for serializing, deserializing, and cloning @@ -28,47 +32,62 @@ import java.io.*; */ public class SerializationUtil { - /** - * BORROwED FROM APACHE STORM PROJECT - * @param obj - * @return - */ - public static byte[] serialize(Object obj) { - try { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(obj); - oos.close(); - return bos.toByteArray(); - } catch(IOException ioe) { - throw new RuntimeException(ioe); - } + /** + * serialize Object as byte array. + * + * <p/> + * BORROwED FROM APACHE STORM PROJECT + * + * @param obj Object + * @return byte[] + */ + public static byte[] serialize(Object obj) { + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(obj); + oos.close(); + return bos.toByteArray(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); } + } - /** - * BORROwED FROM APACHE STORM PROJECT - * @param serialized - * @return - */ - public static Object deserialize(byte[] serialized) { - try { - ByteArrayInputStream bis = new ByteArrayInputStream(serialized); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - ObjectInputStream ois = new ClassLoaderObjectInputStream(classLoader, bis); - Object ret = ois.readObject(); - ois.close(); - return ret; - } catch(IOException ioe) { - throw new RuntimeException(ioe); - } catch(ClassNotFoundException e) { - throw new RuntimeException(e); - } + /** + * deserialize byte array as Object. + * + * <p/> + * BORROwED FROM APACHE STORM PROJECT + * + * @param serialized byte[] + * @return Object + */ + public static Object deserialize(byte[] serialized) { + try { + ByteArrayInputStream bis = new ByteArrayInputStream(serialized); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + ObjectInputStream ois = new ClassLoaderObjectInputStream(classLoader, bis); + Object ret = ois.readObject(); + ois.close(); + return ret; + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } catch (ClassNotFoundException ex) { + throw new RuntimeException(ex); } + } - - public static <T> T cloneBySerialization(T obj) { - if( obj != null ) - return (T) deserialize(serialize(obj)); - else return null; + /** + * clone Object by serialization. + * @param obj Object + * @param <T> type + * @return cloned Object + */ + public static <T> T cloneBySerialization(T obj) { + if ( obj != null ) { + return (T) deserialize(serialize(obj)); + } else { + return null; } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java index 7fbfc6b..3dc3e08 100644 --- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java +++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java @@ -22,61 +22,64 @@ import java.util.concurrent.atomic.AtomicInteger; */ public abstract class AbstractBackOffStrategy implements BackOffStrategy { - private long baseSleepTime; - private long lastSleepTime; - private int maxAttempts; - private AtomicInteger attemptsCount; + private long baseSleepTime; + private long lastSleepTime; + private int maxAttempts; + private AtomicInteger attemptsCount; - /** - * A BackOffStrategy that can effectively be used endlessly. - * @param baseBackOffTime amount of time back of in seconds - */ - public AbstractBackOffStrategy(long baseBackOffTime) { - this(baseBackOffTime, -1); - } + /** + * A BackOffStrategy that can effectively be used endlessly. + * @param baseBackOffTime amount of time back of in seconds + */ + public AbstractBackOffStrategy(long baseBackOffTime) { + this(baseBackOffTime, -1); + } - /** - * A BackOffStrategy that has a limited number of uses before it throws a {@link org.apache.streams.util.api.requests.backoff.BackOffException} - * @param baseBackOffTime time to back off in milliseconds, must be greater than 0. - * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1. -1 indicates there is no maximum number of attempts. - */ - public AbstractBackOffStrategy(long baseBackOffTime, int maximumNumberOfBackOffAttempts) { - if(baseBackOffTime <= 0) { - throw new IllegalArgumentException("backOffTimeInMilliSeconds is not greater than 0 : "+baseBackOffTime); - } - if(maximumNumberOfBackOffAttempts<=0 && maximumNumberOfBackOffAttempts != -1) { - throw new IllegalArgumentException("maximumNumberOfBackOffAttempts is not greater than 0 : "+maximumNumberOfBackOffAttempts); - } - this.baseSleepTime = baseBackOffTime; - this.maxAttempts = maximumNumberOfBackOffAttempts; - this.attemptsCount = new AtomicInteger(0); + /** + * A BackOffStrategy that has a limited number of uses before it throws a + * {@link org.apache.streams.util.api.requests.backoff.BackOffException}. + * @param baseBackOffTime time to back off in milliseconds, must be greater than 0. + * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1. + * -1 indicates there is no maximum number of attempts. + */ + public AbstractBackOffStrategy(long baseBackOffTime, int maximumNumberOfBackOffAttempts) { + if (baseBackOffTime <= 0) { + throw new IllegalArgumentException("backOffTimeInMilliSeconds is not greater than 0 : " + baseBackOffTime); } - - @Override - public void backOff() throws BackOffException { - int attempt = this.attemptsCount.getAndIncrement(); - if(attempt >= this.maxAttempts && this.maxAttempts != -1) { - throw new BackOffException(attempt, this.lastSleepTime); - } else { - try { - Thread.sleep(this.lastSleepTime = calculateBackOffTime(attempt, this.baseSleepTime)); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - } + if (maximumNumberOfBackOffAttempts <= 0 && maximumNumberOfBackOffAttempts != -1) { + throw new IllegalArgumentException("maximumNumberOfBackOffAttempts is not greater than 0 : " + maximumNumberOfBackOffAttempts); } + this.baseSleepTime = baseBackOffTime; + this.maxAttempts = maximumNumberOfBackOffAttempts; + this.attemptsCount = new AtomicInteger(0); + } - @Override - public void reset() { - this.attemptsCount.set(0); + @Override + public void backOff() throws BackOffException { + int attempt = this.attemptsCount.getAndIncrement(); + if (attempt >= this.maxAttempts && this.maxAttempts != -1) { + throw new BackOffException(attempt, this.lastSleepTime); + } else { + try { + Thread.sleep(this.lastSleepTime = calculateBackOffTime(attempt, this.baseSleepTime)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } } + } + + @Override + public void reset() { + this.attemptsCount.set(0); + } - /** - * Calculate the amount of time in milliseconds that the strategy should back off for - * @param attemptCount the number of attempts the strategy has backed off. i.e. 1 -> this is the first attempt, 2 -> this is the second attempt, etc. - * @param baseSleepTime the minimum amount of time it should back off for in milliseconds - * @return the amount of time it should back off in milliseconds - */ - protected abstract long calculateBackOffTime(int attemptCount, long baseSleepTime); + /** + * Calculate the amount of time in milliseconds that the strategy should back off for + * @param attemptCount the number of attempts the strategy has backed off. + * i.e. 1 -> this is the first attempt, 2 -> this is the second attempt, etc. + * @param baseSleepTime the minimum amount of time it should back off for in milliseconds + * @return the amount of time it should back off in milliseconds + */ + protected abstract long calculateBackOffTime(int attemptCount, long baseSleepTime); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java index 223303c..692c0b6 100644 --- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java +++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java @@ -21,43 +21,49 @@ package org.apache.streams.util.api.requests.backoff; */ public class BackOffException extends Exception { - private int attemptCount; - private long sleepTime; - - public BackOffException() { - this(-1, -1); - } - - public BackOffException(String message) { - this(message, -1, -1); - } - - public BackOffException(int attemptCount, long maxSleepTime) { - this.attemptCount = attemptCount; - this.sleepTime = maxSleepTime; - } - - public BackOffException(String message, int attemptCount, long maxSleepTime) { - super(message); - this.attemptCount = attemptCount; - this.sleepTime = maxSleepTime; - } - - /** - * Gets the number of back off attempts that happened before the exception was thrown. If the function that - * initialized this exception does not set the number of attempts, -1 will be returned. - * @return number of attempts - */ - public int getNumberOfBackOffsAttempted() { - return this.attemptCount; - } - - /** - * Gets the longest sleep period that the strategy attempted. If the function that - * initialized this exception does not set the longest sleep period, -1 will be returned. - * @return - */ - public long getLongestBackOff() { - return this.sleepTime; - } + private int attemptCount; + private long sleepTime; + + public BackOffException() { + this(-1, -1); + } + + public BackOffException(String message) { + this(message, -1, -1); + } + + public BackOffException(int attemptCount, long maxSleepTime) { + this.attemptCount = attemptCount; + this.sleepTime = maxSleepTime; + } + + /** + * BackOffException constructor. + * @param message message + * @param attemptCount attemptCount + * @param maxSleepTime maxSleepTime (in millis) + */ + public BackOffException(String message, int attemptCount, long maxSleepTime) { + super(message); + this.attemptCount = attemptCount; + this.sleepTime = maxSleepTime; + } + + /** + * Gets the number of back off attempts that happened before the exception was thrown. If the function that + * initialized this exception does not set the number of attempts, -1 will be returned. + * @return number of back off attempts + */ + public int getNumberOfBackOffsAttempted() { + return this.attemptCount; + } + + /** + * Gets the longest sleep period that the strategy attempted. If the function that + * initialized this exception does not set the longest sleep period, -1 will be returned. + * @return longest sleep period that the strategy attempted + */ + public long getLongestBackOff() { + return this.sleepTime; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java index a0d80e8..44497ab 100644 --- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java +++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java @@ -19,6 +19,7 @@ package org.apache.streams.util.api.requests.backoff; * BackOffStrategy will cause the current thread to sleep for a specific amount of time. This is used to adhere to * api rate limits. * + * <p/> * The example below illustrates using a BackOffStrategy to slow down requests when you hit a rate limit exception. * * <code> @@ -36,16 +37,17 @@ package org.apache.streams.util.api.requests.backoff; */ public interface BackOffStrategy { - /** - * Cause the current thread to sleep for an amount of time based on the implemented strategy. If limits are set - * on the number of times the backOff can be called, an exception will be thrown. - * @throws BackOffException - */ - public void backOff() throws BackOffException; + /** + * Cause the current thread to sleep for an amount of time based on the implemented strategy. If limits are set + * on the number of times the backOff can be called, an exception will be thrown. + * @throws BackOffException BackOffException + */ + public void backOff() throws BackOffException; - /** - * Rests the back off strategy to its original state. After the call the strategy will act as if {@link AbstractBackOffStrategy#backOff()} - * has never been called. - */ - public void reset(); + /** + * Rests the back off strategy to its original state. + * After the call the strategy will act as if {@link AbstractBackOffStrategy#backOff()} + * has never been called. + */ + public void reset(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java index b3fd3f2..26ec225 100644 --- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java +++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java @@ -24,25 +24,27 @@ import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy; */ public class ConstantTimeBackOffStrategy extends AbstractBackOffStrategy { - /** - * A ConstantTimeBackOffStrategy that can effectively be used endlessly. - * @param baseBackOffTimeInMiliseconds amount of time back of in milliseconds - */ - public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds) { - this(baseBackOffTimeInMiliseconds, -1); - } + /** + * A ConstantTimeBackOffStrategy that can effectively be used endlessly. + * @param baseBackOffTimeInMiliseconds amount of time back of in milliseconds + */ + public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds) { + this(baseBackOffTimeInMiliseconds, -1); + } - /** - * A ConstantTimeBackOffStrategy that has a limited number of uses before it throws a {@link org.apache.streams.util.api.requests.backoff.BackOffException} - * @param baseBackOffTimeInMiliseconds time to back off in milliseconds, must be greater than 0. - * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1. -1 indicates there is no maximum number of attempts. - */ - public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds, int maximumNumberOfBackOffAttempts) { - super(baseBackOffTimeInMiliseconds, maximumNumberOfBackOffAttempts); - } + /** + * A ConstantTimeBackOffStrategy that has a limited number of uses before it + * throws a {@link org.apache.streams.util.api.requests.backoff.BackOffException} + * @param baseBackOffTimeInMiliseconds time to back off in milliseconds, must be greater than 0. + * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1. + * -1 indicates there is no maximum number of attempts. + */ + public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds, int maximumNumberOfBackOffAttempts) { + super(baseBackOffTimeInMiliseconds, maximumNumberOfBackOffAttempts); + } - @Override - protected long calculateBackOffTime(int attemptCount, long baseSleepTime) { - return baseSleepTime; - } + @Override + protected long calculateBackOffTime(int attemptCount, long baseSleepTime) { + return baseSleepTime; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java index a5a9656..0962984 100644 --- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java +++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java @@ -18,30 +18,29 @@ package org.apache.streams.util.api.requests.backoff.impl; import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy; /** - * Exponential backk strategy. Caluclated by baseBackOffTimeInSeconds raised the attempt-count power. + * Exponential backoff strategy. Calculated by baseBackOffTimeInSeconds raised the attempt-count power. */ public class ExponentialBackOffStrategy extends AbstractBackOffStrategy { + /** + * Unlimited use ExponentialBackOffStrategy. + * @param baseBackOffTimeInSeconds baseBackOffTimeInSeconds + */ + public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds) { + this(baseBackOffTimeInSeconds, -1); + } - /** - * Unlimited use ExponentialBackOffStrategy - * @param baseBackOffTimeInSeconds - */ - public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds) { - this(baseBackOffTimeInSeconds, -1); - } + /** + * Limited use ExponentialBackOffStrategy. + * @param baseBackOffTimeInSeconds baseBackOffTimeInSeconds + * @param maxNumAttempts maxNumAttempts + */ + public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds, int maxNumAttempts) { + super(baseBackOffTimeInSeconds, maxNumAttempts); + } - /** - * Limited use ExponentialBackOffStrategy - * @param baseBackOffTimeInSeconds - * @param maxNumAttempts - */ - public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds, int maxNumAttempts) { - super(baseBackOffTimeInSeconds, maxNumAttempts); - } - - @Override - protected long calculateBackOffTime(int attemptCount, long baseSleepTime) { - return Math.round(Math.pow(baseSleepTime, attemptCount)) * 1000; - } + @Override + protected long calculateBackOffTime(int attemptCount, long baseSleepTime) { + return Math.round(Math.pow(baseSleepTime, attemptCount)) * 1000; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java index 38d05a1..d6f323f 100644 --- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java +++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java @@ -24,17 +24,16 @@ import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy; */ public class LinearTimeBackOffStrategy extends AbstractBackOffStrategy { + public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds) { + this(baseBackOffTimeInSeconds, -1); + } - public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds) { - this(baseBackOffTimeInSeconds, -1); - } + public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds, int maxAttempts) { + super(baseBackOffTimeInSeconds, -1); + } - public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds, int maxAttempts) { - super(baseBackOffTimeInSeconds, -1); - } - - @Override - protected long calculateBackOffTime(int attemptCount, long baseSleepTime) { - return 1000L * attemptCount * baseSleepTime; - } + @Override + protected long calculateBackOffTime(int attemptCount, long baseSleepTime) { + return 1000L * attemptCount * baseSleepTime; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java index dfdec72..41ec4b6 100644 --- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java +++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java @@ -12,22 +12,23 @@ software distributed under the License is distributed on an KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package org.apache.streams.util.oauth.tokens; /** - * + * AbstractOauthToken. */ public abstract class AbstractOauthToken { - /** - * Must create equals method for all OauthTokens. - * @param o - * @return true if equal, and false otherwise - */ - protected abstract boolean internalEquals(Object o); + /** + * Must create equals method for all OauthTokens. + * @param object object for comparison + * @return true if equal, and false otherwise + */ + protected abstract boolean internalEquals(Object object); - @Override - public boolean equals(Object o) { - return this.internalEquals(o); - } + @Override + public boolean equals(Object object) { + return this.internalEquals(object); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java index fed194f..7b3f370 100644 --- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java +++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java @@ -12,42 +12,40 @@ software distributed under the License is distributed on an KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package org.apache.streams.util.oauth.tokens.tokenmanager; -import org.apache.streams.util.oauth.tokens.AbstractOauthToken; +package org.apache.streams.util.oauth.tokens.tokenmanager; import java.util.Collection; /** - * Manges access to oauth tokens. Allows a caller to add tokens to the token pool and receive an available token. + * Manages access to oauth tokens. Allows a caller to add tokens to the token pool and receive an available token. */ public interface SimpleTokenManager<T> { - - /** - * Adds a token to the available token pool. - * @param token Token to be added - * @return true, if token was successfully added to the pool and false otherwise. - */ - public boolean addTokenToPool(T token); - - /** - * Adds a {@link java.util.Collection} of tokens to the available token pool. - * @param tokens Tokens to be added - * @return true, if the token pool size increased after adding the tokens, and false otherwise. - */ - public boolean addAllTokensToPool(Collection<T> tokens); - - /** - * Get an available token. If no tokens are available it returns null. - * @return next available token - */ - public T getNextAvailableToken(); - - /** - * Get the number of available tokens - * @return number of available tokens - */ - public int numAvailableTokens(); + /** + * Adds a token to the available token pool. + * @param token Token to be added + * @return true, if token was successfully added to the pool and false otherwise. + */ + public boolean addTokenToPool(T token); + + /** + * Adds a {@link java.util.Collection} of tokens to the available token pool. + * @param tokens Tokens to be added + * @return true, if the token pool size increased after adding the tokens, and false otherwise. + */ + public boolean addAllTokensToPool(Collection<T> tokens); + + /** + * Get an available token. If no tokens are available it returns null. + * @return next available token + */ + public T getNextAvailableToken(); + + /** + * Get the number of available tokens. + * @return number of available tokens + */ + public int numAvailableTokens(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManager.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManager.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManager.java new file mode 100644 index 0000000..7c1a9e3 --- /dev/null +++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManager.java @@ -0,0 +1,94 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance * +http://www.apache.org/licenses/LICENSE-2.0 * +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. */ + +package org.apache.streams.util.oauth.tokens.tokenmanager.impl; + +import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager; + +import java.util.ArrayList; +import java.util.Collection; + +/** + * Manages a pool of tokens the most basic possible way. + * If all tokens are added to the manager before {@link BasicTokenManager#getNextAvailableToken() getNextAvailableToken} + * is called tokens are issued in the order they were added to the manager, FIFO. The BasicTokenManager acts as a circular queue + * of tokens. Once the manager issues all available tokens it will cycle back to the first token and start issuing tokens again. + * + * </p> + * When adding tokens to the pool of available tokens, the manager will not add tokens that are already in the pool. + * + * <p/> + * The manager class is thread safe. + */ +public class BasicTokenManager<T> implements SimpleTokenManager<T> { + + private ArrayList<T> availableTokens; + private int nextToken; + + public BasicTokenManager() { + this(null); + } + + /** + * BasicTokenManager constructor. + * @param tokens Collection of tokens + */ + public BasicTokenManager(Collection<T> tokens) { + if (tokens != null) { + this.availableTokens = new ArrayList<T>(tokens.size()); + this.addAllTokensToPool(tokens); + } else { + this.availableTokens = new ArrayList<T>(); + } + this.nextToken = 0; + } + + @Override + public synchronized boolean addTokenToPool(T token) { + if (token == null || this.availableTokens.contains(token)) { + return false; + } else { + return this.availableTokens.add(token); + } + } + + @Override + public synchronized boolean addAllTokensToPool(Collection<T> tokens) { + int startSize = this.availableTokens.size(); + for (T token : tokens) { + this.addTokenToPool(token); + } + return startSize < this.availableTokens.size(); + } + + @Override + public synchronized T getNextAvailableToken() { + T token = null; + if (this.availableTokens.size() == 0) { + return token; + } else { + token = this.availableTokens.get(nextToken++); + if (nextToken == this.availableTokens.size()) { + nextToken = 0; + } + return token; + } + } + + @Override + public synchronized int numAvailableTokens() { + return this.availableTokens.size(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java deleted file mode 100644 index 4c64bf7..0000000 --- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java +++ /dev/null @@ -1,86 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance * -http://www.apache.org/licenses/LICENSE-2.0 * -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. */ -package org.apache.streams.util.oauth.tokens.tokenmanager.impl; - -import org.apache.streams.util.oauth.tokens.AbstractOauthToken; -import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager; - -import java.util.ArrayList; -import java.util.Collection; - -/** - * Manages a pool of tokens the most basic possible way. If all tokens are added to the manager before {@link BasicTokenManger#getNextAvailableToken() getNextAvailableToken} - * is called tokens are issued in the order they were added to the manager, FIFO. The BasicTokenManager acts as a circular queue - * of tokens. Once the manager issues all available tokens it will cycle back to the first token and start issuing tokens again. - * - * When adding tokens to the pool of available tokens, the manager will not add tokens that are already in the pool. - * - * The manager class is thread safe. - */ -public class BasicTokenManger<T> implements SimpleTokenManager<T>{ - - private ArrayList<T> availableTokens; - private int nextToken; - - public BasicTokenManger() { - this(null); - } - - public BasicTokenManger(Collection<T> tokens) { - if(tokens != null) { - this.availableTokens = new ArrayList<T>(tokens.size()); - this.addAllTokensToPool(tokens); - } else { - this.availableTokens = new ArrayList<T>(); - } - this.nextToken = 0; - } - - @Override - public synchronized boolean addTokenToPool(T token) { - if(token == null || this.availableTokens.contains(token)) - return false; - else - return this.availableTokens.add(token); - } - - @Override - public synchronized boolean addAllTokensToPool(Collection<T> tokens) { - int startSize = this.availableTokens.size(); - for(T token : tokens) { - this.addTokenToPool(token); - } - return startSize < this.availableTokens.size(); - } - - @Override - public synchronized T getNextAvailableToken() { - T token = null; - if(this.availableTokens.size() == 0) { - return token; - } else { - token = this.availableTokens.get(nextToken++); - if(nextToken == this.availableTokens.size()) { - nextToken = 0; - } - return token; - } - } - - @Override - public synchronized int numAvailableTokens() { - return this.availableTokens.size(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java b/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java index 450851e..57a1d44 100644 --- a/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java +++ b/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java @@ -15,6 +15,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.util.schema; /** @@ -22,10 +23,10 @@ package org.apache.streams.util.schema; * be able to translate. */ public enum FieldType { - STRING, - INTEGER, - NUMBER, - BOOLEAN, - OBJECT, - ARRAY + STRING, + INTEGER, + NUMBER, + BOOLEAN, + OBJECT, + ARRAY } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java b/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java index 6582565..a437ca4 100644 --- a/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java +++ b/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java @@ -15,11 +15,9 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.util.schema; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; /** @@ -27,25 +25,32 @@ import com.fasterxml.jackson.databind.node.ObjectNode; */ public class FieldUtil { - public static FieldType determineFieldType(ObjectNode fieldNode) { - String typeSchemaField = "type"; - if( !fieldNode.has(typeSchemaField)) - return null; - String typeSchemaFieldValue = fieldNode.get(typeSchemaField).asText(); - if( typeSchemaFieldValue.equals("string")) { - return FieldType.STRING; - } else if( typeSchemaFieldValue.equals("integer")) { - return FieldType.INTEGER; - } else if( typeSchemaFieldValue.equals("number")) { - return FieldType.NUMBER; - } else if( typeSchemaFieldValue.equals("object")) { - return FieldType.OBJECT; - } else if( typeSchemaFieldValue.equals("boolean")) { - return FieldType.BOOLEAN; - } else if( typeSchemaFieldValue.equals("array")) { - return FieldType.ARRAY; - } - else return null; + /** + * determine FieldType from ObjectNode. + * @param fieldNode ObjectNode + * @return FieldType + */ + public static FieldType determineFieldType(ObjectNode fieldNode) { + String typeSchemaField = "type"; + if ( !fieldNode.has(typeSchemaField)) { + return null; + } + String typeSchemaFieldValue = fieldNode.get(typeSchemaField).asText(); + if ( typeSchemaFieldValue.equals("string")) { + return FieldType.STRING; + } else if ( typeSchemaFieldValue.equals("integer")) { + return FieldType.INTEGER; + } else if ( typeSchemaFieldValue.equals("number")) { + return FieldType.NUMBER; + } else if ( typeSchemaFieldValue.equals("object")) { + return FieldType.OBJECT; + } else if ( typeSchemaFieldValue.equals("boolean")) { + return FieldType.BOOLEAN; + } else if ( typeSchemaFieldValue.equals("array")) { + return FieldType.ARRAY; + } else { + return null; } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java b/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java index c51339a..5acd5a8 100644 --- a/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java +++ b/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java @@ -15,6 +15,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.util.schema; import com.google.common.base.Preconditions; @@ -35,60 +36,93 @@ import java.util.List; */ public class FileUtil { - private final static Logger LOGGER = LoggerFactory.getLogger(FileUtil.class); + private static final Logger LOGGER = LoggerFactory.getLogger(FileUtil.class); - public static String dropSourcePathPrefix(String inputFile, String sourceDirectory) { - if(Strings.isNullOrEmpty(sourceDirectory)) - return inputFile; - else { - try { - if( inputFile.contains(sourceDirectory) && inputFile.indexOf(sourceDirectory) > 0) { - return inputFile.substring(inputFile.indexOf(sourceDirectory)+sourceDirectory.length()+1); - } - } catch( Throwable e ) { - return inputFile; - } + /** + * drop source path prefix between inputFile and sourceDirectory. + * @param inputFile inputFile + * @param sourceDirectory sourceDirectory + * @return without path prefix + */ + public static String dropSourcePathPrefix(String inputFile, String sourceDirectory) { + if (Strings.isNullOrEmpty(sourceDirectory)) { + return inputFile; + } else { + try { + if ( inputFile.contains(sourceDirectory) && inputFile.indexOf(sourceDirectory) > 0) { + return inputFile.substring(inputFile.indexOf(sourceDirectory) + sourceDirectory.length() + 1); } + } catch ( Throwable throwable ) { return inputFile; + } } + return inputFile; + } - public static String swapExtension(String inputFile, String originalExtension, String newExtension) { - if(inputFile.endsWith("."+originalExtension)) - return inputFile.replace("."+originalExtension, "."+newExtension); - else return inputFile; + /** + * swapExtension. + * @param inputFile inputFile + * @param originalExtension originalExtension + * @param newExtension newExtension + * @return extension swapped + */ + public static String swapExtension(String inputFile, String originalExtension, String newExtension) { + if (inputFile.endsWith("." + originalExtension)) { + return inputFile.replace("." + originalExtension, "." + newExtension); + } else { + return inputFile; } + } - public static String dropExtension(String inputFile) { - if(inputFile.contains(".")) - return inputFile.substring(0, inputFile.lastIndexOf(".")); - else return inputFile; + /** + * dropExtension. + * @param inputFile inputFile + * @return extension dropped + */ + public static String dropExtension(String inputFile) { + if (inputFile.contains(".")) { + return inputFile.substring(0, inputFile.lastIndexOf(".")); + } else { + return inputFile; } + } - public static void writeFile(String resourceFile, String resourceContent) { - try { - File path = new File(resourceFile); - File dir = path.getParentFile(); - if( !dir.exists() ) - dir.mkdirs(); - Files.write(Paths.get(resourceFile), resourceContent.getBytes(), StandardOpenOption.CREATE_NEW); - } catch (Exception e) { - LOGGER.error("Write Exception: {}", e); - } + /** + * writeFile. + * @param resourceFile resourceFile + * @param resourceContent resourceContent + */ + public static void writeFile(String resourceFile, String resourceContent) { + try { + File path = new File(resourceFile); + File dir = path.getParentFile(); + if ( !dir.exists() ) { + dir.mkdirs(); + } + Files.write(Paths.get(resourceFile), resourceContent.getBytes(), StandardOpenOption.CREATE_NEW); + } catch (Exception ex) { + LOGGER.error("Write Exception: {}", ex); } + } - public static void resolveRecursive(GenerationConfig config, List<File> schemaFiles) { - - Preconditions.checkArgument(schemaFiles.size() > 0); - int i = 0; - while( schemaFiles.size() > i) { - File child = schemaFiles.get(i); - if (child.isDirectory()) { - schemaFiles.addAll(Arrays.asList(child.listFiles(config.getFileFilter()))); - schemaFiles.remove(child); - } else { - i += 1; - } - } + /** + * resolveRecursive. + * @param config GenerationConfig + * @param schemaFiles List of schemaFiles + */ + public static void resolveRecursive(GenerationConfig config, List<File> schemaFiles) { + Preconditions.checkArgument(schemaFiles.size() > 0); + int index = 0; + while ( schemaFiles.size() > index) { + File child = schemaFiles.get(index); + if (child.isDirectory()) { + schemaFiles.addAll(Arrays.asList(child.listFiles(config.getFileFilter()))); + schemaFiles.remove(child); + } else { + index += 1; + } } + + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java b/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java index c48d186..d7fa7e7 100644 --- a/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java +++ b/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java @@ -15,6 +15,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.util.schema; import java.io.File; @@ -24,110 +25,39 @@ import java.util.Iterator; /** * GenerationConfig represents the common fields and field accessors for - * streams modules that transform schemas into generated-sources or generated-resources + * streams modules that transform schemas into generated-sources or generated-resources. */ public interface GenerationConfig { - /** - * Gets the 'source' configuration option. - * - * @return The source file(s) or directory(ies) from which JSON Schema will - * be read. - */ - Iterator<URL> getSource(); - - /** - * Gets the 'targetDirectory' configuration option. - * - * @return The target directory into which generated types will be written - * (may or may not exist before types are written) - */ - File getTargetDirectory(); - - /** - * Gets the 'outputEncoding' configuration option. - * - * @return The character encoding that should be used when writing output files. - */ - String getOutputEncoding(); - - /** - * Gets the file filter used to isolate the schema mapping files in the - * source directories. - * - * @return the file filter use when scanning for schema files. - */ - FileFilter getFileFilter(); - - /** - * Gets the 'includeAdditionalProperties' configuration option. - * - * @return Whether to allow 'additional properties' support in objects. - * Setting this to false will disable additional properties support, - * regardless of the input schema(s). - */ -// boolean isIncludeAdditionalProperties(); - - /** - * Gets the 'targetVersion' configuration option. - * - * @return The target version for generated source files. - */ -// String getTargetVersion(); - -// /** -// * Gets the `includeDynamicAccessors` configuraiton option. -// * -// * @return Whether to include dynamic getters, setters, and builders -// * or to omit these methods. -// */ -// boolean isIncludeDynamicAccessors(); - -// /** -// * Gets the `dateTimeType` configuration option. -// * <p> -// * Example values: -// * <ul> -// * <li><code>org.joda.time.LocalDateTime</code> (Joda)</li> -// * <li><code>java.time.LocalDateTime</code> (JSR310)</li> -// * <li><code>null</code> (default behavior)</li> -// * </ul> -// * -// * @return The java type to use instead of {@link java.util.Date} -// * when adding date type fields to generate Java types. -// */ -// String getDateTimeType(); -// -// /** -// * Gets the `dateType` configuration option. -// * <p> -// * Example values: -// * <ul> -// * <li><code>org.joda.time.LocalDate</code> (Joda)</li> -// * <li><code>java.time.LocalDate</code> (JSR310)</li> -// * <li><code>null</code> (default behavior)</li> -// * </ul> -// * -// * @return The java type to use instead of string -// * when adding string type fields with a format of date (not -// * date-time) to generated Java types. -// */ -// String getDateType(); -// -// /** -// * Gets the `timeType` configuration option. -// * <p> -// * Example values: -// * <ul> -// * <li><code>org.joda.time.LocalTime</code> (Joda)</li> -// * <li><code>java.time.LocalTime</code> (JSR310)</li> -// * <li><code>null</code> (default behavior)</li> -// * </ul> -// * -// * @return The java type to use instead of string -// * when adding string type fields with a format of time (not -// * date-time) to generated Java types. -// */ -// String getTimeType(); + /** + * Gets the 'source' configuration option. + * + * @return The source file(s) or directory(ies) from which JSON Schema will + * be read. + */ + Iterator<URL> getSource(); + + /** + * Gets the 'targetDirectory' configuration option. + * + * @return The target directory into which generated types will be written + * (may or may not exist before types are written) + */ + File getTargetDirectory(); + + /** + * Gets the 'outputEncoding' configuration option. + * + * @return The character encoding that should be used when writing output files. + */ + String getOutputEncoding(); + + /** + * Gets the file filter used to isolate the schema mapping files in the + * source directories. + * + * @return the file filter use when scanning for schema files. + */ + FileFilter getFileFilter(); }
