Fixing stylecheck problems with storm-hdfs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7da98cf0 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7da98cf0 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7da98cf0 Branch: refs/heads/master Commit: 7da98cf0c5d3e23fac42871974ff8017924673c5 Parents: 1872317 Author: Kishor Patil <[email protected]> Authored: Sun Apr 22 21:45:33 2018 -0400 Committer: Kishor Patil <[email protected]> Committed: Mon Apr 23 00:22:35 2018 -0400 ---------------------------------------------------------------------- external/storm-hdfs/pom.xml | 2 +- .../storm/hdfs/avro/AbstractAvroSerializer.java | 33 +- .../storm/hdfs/avro/AvroSchemaRegistry.java | 22 +- .../org/apache/storm/hdfs/avro/AvroUtils.java | 24 +- .../hdfs/avro/ConfluentAvroSerializer.java | 24 +- .../storm/hdfs/avro/FixedAvroSerializer.java | 30 +- .../storm/hdfs/avro/GenericAvroSerializer.java | 19 +- .../storm/hdfs/bolt/AbstractHdfsBolt.java | 79 ++- .../storm/hdfs/bolt/AvroGenericRecordBolt.java | 52 +- .../org/apache/storm/hdfs/bolt/HdfsBolt.java | 51 +- .../storm/hdfs/bolt/SequenceFileBolt.java | 50 +- .../java/org/apache/storm/hdfs/bolt/Writer.java | 18 +- .../hdfs/bolt/format/DefaultFileNameFormat.java | 32 +- .../hdfs/bolt/format/DefaultSequenceFormat.java | 12 +- .../hdfs/bolt/format/DelimitedRecordFormat.java | 29 +- .../storm/hdfs/bolt/format/FileNameFormat.java | 22 +- .../storm/hdfs/bolt/format/RecordFormat.java | 22 +- .../storm/hdfs/bolt/format/SequenceFormat.java | 7 +- .../hdfs/bolt/format/SimpleFileNameFormat.java | 38 +- .../hdfs/bolt/rotation/FileRotationPolicy.java | 22 +- .../bolt/rotation/FileSizeRotationPolicy.java | 62 +- .../hdfs/bolt/rotation/NoRotationPolicy.java | 19 +- .../hdfs/bolt/rotation/TimedRotationPolicy.java | 43 +- .../storm/hdfs/bolt/sync/CountSyncPolicy.java | 21 +- .../apache/storm/hdfs/bolt/sync/SyncPolicy.java | 22 +- .../storm/hdfs/common/AbstractHDFSWriter.java | 22 +- .../common/AvroGenericRecordHDFSWriter.java | 24 +- .../apache/storm/hdfs/common/HDFSWriter.java | 26 +- .../org/apache/storm/hdfs/common/HdfsUtils.java | 137 +++-- .../storm/hdfs/common/ModifTimeComparator.java | 27 +- .../storm/hdfs/common/NullPartitioner.java | 19 +- .../apache/storm/hdfs/common/Partitioner.java | 24 +- .../storm/hdfs/common/SequenceFileWriter.java | 24 +- .../hdfs/common/rotation/MoveFileAction.java | 6 +- .../hdfs/common/rotation/RotationAction.java | 6 +- .../storm/hdfs/spout/AbstractFileReader.java | 70 ++- .../org/apache/storm/hdfs/spout/Configs.java | 45 +- .../org/apache/storm/hdfs/spout/DirLock.java | 182 +++--- .../org/apache/storm/hdfs/spout/FileLock.java | 563 ++++++++++--------- .../org/apache/storm/hdfs/spout/FileOffset.java | 25 +- .../org/apache/storm/hdfs/spout/FileReader.java | 49 +- .../org/apache/storm/hdfs/spout/HdfsSpout.java | 161 +++--- .../apache/storm/hdfs/spout/ParseException.java | 24 +- .../storm/hdfs/spout/ProgressTracker.java | 86 ++- .../storm/hdfs/spout/SequenceFileReader.java | 340 ++++++----- .../apache/storm/hdfs/spout/TextFileReader.java | 304 +++++----- .../apache/storm/hdfs/trident/HdfsState.java | 322 +++++------ .../storm/hdfs/trident/HdfsStateFactory.java | 12 +- .../apache/storm/hdfs/trident/HdfsUpdater.java | 6 +- .../trident/format/DefaultFileNameFormat.java | 29 +- .../trident/format/DefaultSequenceFormat.java | 11 +- .../trident/format/DelimitedRecordFormat.java | 31 +- .../hdfs/trident/format/FileNameFormat.java | 19 +- .../storm/hdfs/trident/format/RecordFormat.java | 22 +- .../hdfs/trident/format/SequenceFormat.java | 7 +- .../trident/format/SimpleFileNameFormat.java | 36 +- .../trident/rotation/FileRotationPolicy.java | 24 +- .../rotation/FileSizeRotationPolicy.java | 64 +-- .../hdfs/trident/rotation/NoRotationPolicy.java | 19 +- .../trident/rotation/TimedRotationPolicy.java | 49 +- .../hdfs/trident/sync/CountSyncPolicy.java | 21 +- .../storm/hdfs/trident/sync/SyncPolicy.java | 22 +- .../hdfs/avro/TestFixedAvroSerializer.java | 44 +- .../hdfs/avro/TestGenericAvroSerializer.java | 33 +- .../hdfs/bolt/AvroGenericRecordBoltTest.java | 129 ++--- .../apache/storm/hdfs/bolt/TestHdfsBolt.java | 135 ++--- .../storm/hdfs/bolt/TestSequenceFileBolt.java | 115 ++-- .../apache/storm/hdfs/bolt/TestWritersMap.java | 22 +- .../bolt/format/TestSimpleFileNameFormat.java | 39 +- .../apache/storm/hdfs/spout/ConfigsTest.java | 46 +- .../apache/storm/hdfs/spout/TestDirLock.java | 29 +- .../apache/storm/hdfs/spout/TestFileLock.java | 47 +- .../storm/hdfs/spout/TestHdfsSemantics.java | 35 +- .../apache/storm/hdfs/spout/TestHdfsSpout.java | 201 ++++--- .../storm/hdfs/spout/TestProgressTracker.java | 21 +- .../storm/hdfs/testing/MiniDFSClusterRule.java | 8 +- .../storm/hdfs/trident/HdfsStateTest.java | 103 ++-- .../format/TestSimpleFileNameFormat.java | 30 +- 78 files changed, 2128 insertions(+), 2522 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml index 3fdc801..cb5baa2 100644 --- a/external/storm-hdfs/pom.xml +++ b/external/storm-hdfs/pom.xml @@ -267,7 +267,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>1406</maxAllowedViolations> + <maxAllowedViolations>189</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java index ddf015d..adb842a 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java @@ -1,26 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.avro; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericContainer; import org.apache.avro.generic.GenericDatumReader; @@ -30,10 +26,9 @@ import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; -import java.io.IOException; - //Generously adapted from: -//https://github.com/kijiproject/kiji-express/blob/master/kiji-express/src/main/scala/org/kiji/express/flow/framework/serialization/AvroSerializer.scala +//https://github.com/kijiproject/kiji-express/blob/master/kiji-express/src/main/scala/org/kiji/express/flow/framework/serialization +// /AvroSerializer.scala //Which has as an ASL2.0 license /** @@ -52,8 +47,8 @@ public abstract class AbstractAvroSerializer extends Serializer<GenericContainer GenericDatumWriter<GenericContainer> writer = new GenericDatumWriter<>(record.getSchema()); BinaryEncoder encoder = EncoderFactory - .get() - .directBinaryEncoder(output, null); + .get() + .directBinaryEncoder(output, null); try { writer.write(record, encoder); } catch (IOException e) { @@ -66,8 +61,8 @@ public abstract class AbstractAvroSerializer extends Serializer<GenericContainer Schema theSchema = this.getSchema(input.readString()); GenericDatumReader<GenericContainer> reader = new GenericDatumReader<>(theSchema); Decoder decoder = DecoderFactory - .get() - .directBinaryDecoder(input, null); + .get() + .directBinaryDecoder(input, null); GenericContainer foo; try { http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java index 0d1dc8b..cca5099 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hdfs.avro; -import org.apache.avro.Schema; +package org.apache.storm.hdfs.avro; import java.io.Serializable; +import org.apache.avro.Schema; public interface AvroSchemaRegistry extends Serializable { String getFingerprint(Schema schema); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java index 5549291..13798bb 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.avro; import org.apache.avro.generic.GenericData; @@ -33,9 +28,8 @@ public class AvroUtils { public static void addAvroKryoSerializations(Config conf) throws ClassNotFoundException { final Class serializerClass; if (conf.containsKey("avro.serializer")) { - serializerClass = Class.forName((String)conf.get("avro.serializer")); - } - else { + serializerClass = Class.forName((String) conf.get("avro.serializer")); + } else { serializerClass = GenericAvroSerializer.class; } conf.registerSerialization(GenericData.Record.class, serializerClass); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java index bb03a11..17f3eb7 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java @@ -1,30 +1,24 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.avro; import com.esotericsoftware.kryo.Kryo; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import org.apache.avro.Schema; - import java.io.IOException; import java.util.Map; +import org.apache.avro.Schema; /** * This class provides a mechanism to utilize the Confluent Schema Registry (https://github.com/confluentinc/schema-registry) @@ -33,8 +27,8 @@ import java.util.Map; */ public class ConfluentAvroSerializer extends AbstractAvroSerializer { - private SchemaRegistryClient theClient; final private String url; + private SchemaRegistryClient theClient; /** * A constructor for use by test cases ONLY, thus the default scope. http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java index 4dd5fdc..128e802 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java @@ -1,25 +1,16 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hdfs.avro; -import org.apache.avro.Schema; -import org.apache.avro.SchemaNormalization; -import org.apache.commons.codec.binary.Base64; +package org.apache.storm.hdfs.avro; import java.io.BufferedReader; import java.io.IOException; @@ -28,6 +19,9 @@ import java.io.InputStreamReader; import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.SchemaNormalization; +import org.apache.commons.codec.binary.Base64; /** * A class to help (de)serialize a pre-defined set of Avro schemas. Schemas should be listed, one per line, in a file @@ -45,9 +39,9 @@ public class FixedAvroSerializer extends AbstractAvroSerializer { BufferedReader reader = new BufferedReader(new InputStreamReader(in)); String line; - while((line = reader.readLine()) != null) { + while ((line = reader.readLine()) != null) { Schema schema = new Schema.Parser().parse(line); - byte [] fp = SchemaNormalization.parsingFingerprint(FP_ALGO, schema); + byte[] fp = SchemaNormalization.parsingFingerprint(FP_ALGO, schema); String fingerPrint = new String(Base64.decodeBase64(fp)); fingerprint2schemaMap.put(fingerPrint, schema); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java index fedf698..6bb0e26 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.avro; import org.apache.avro.Schema; http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java index 10884a6..bcff104 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java @@ -1,28 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.bolt; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.TupleUtils; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -34,19 +32,15 @@ import org.apache.storm.hdfs.common.NullPartitioner; import org.apache.storm.hdfs.common.Partitioner; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.apache.storm.hdfs.security.HdfsSecurityUtil; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; - public abstract class AbstractHdfsBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(AbstractHdfsBolt.class); private static final Integer DEFAULT_RETRY_COUNT = 3; @@ -68,14 +62,13 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { protected String configKey; protected transient Object writeLock; protected transient Timer rotationTimer; // only used for TimedRotationPolicy - private List<Tuple> tupleBatch = new LinkedList<>(); protected long offset = 0; protected Integer fileRetryCount = DEFAULT_RETRY_COUNT; protected Integer tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS; protected Integer maxOpenFiles = DEFAULT_MAX_OPEN_FILES; protected Partitioner partitioner = new NullPartitioner(); - protected transient Configuration hdfsConfig; + private List<Tuple> tupleBatch = new LinkedList<>(); protected void rotateOutputFile(Writer writer) throws IOException { LOG.info("Rotating output file..."); @@ -98,7 +91,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { * @param topologyContext * @param collector */ - public final void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector){ + public final void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) { this.writeLock = new Object(); if (this.syncPolicy == null) throw new IllegalStateException("SyncPolicy must be specified."); if (this.rotationPolicy == null) throw new IllegalStateException("RotationPolicy must be specified."); @@ -111,21 +104,21 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { this.collector = collector; this.fileNameFormat.prepare(conf, topologyContext); this.hdfsConfig = new Configuration(); - Map<String, Object> map = (Map<String, Object>)conf.get(this.configKey); - if(map != null){ - for(String key : map.keySet()){ + Map<String, Object> map = (Map<String, Object>) conf.get(this.configKey); + if (map != null) { + for (String key : map.keySet()) { this.hdfsConfig.set(key, String.valueOf(map.get(key))); } } - try{ + try { HdfsSecurityUtil.login(conf, hdfsConfig); doPrepare(conf, topologyContext, collector); - } catch (Exception e){ + } catch (Exception e) { throw new RuntimeException("Error preparing HdfsBolt: " + e.getMessage(), e); } - if(this.rotationPolicy instanceof TimedRotationPolicy){ + if (this.rotationPolicy instanceof TimedRotationPolicy) { startTimedRotationPolicy(); } } @@ -280,7 +273,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { } private void startTimedRotationPolicy() { - long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval(); + long interval = ((TimedRotationPolicy) this.rotationPolicy).getInterval(); this.rotationTimer = new Timer(true); TimerTask task = new TimerTask() { @Override @@ -295,8 +288,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { final String partitionPath = this.partitioner.getPartitionPath(tuple); final int rotation; - if (rotationCounterMap.containsKey(partitionPath)) - { + if (rotationCounterMap.containsKey(partitionPath)) { rotation = rotationCounterMap.get(partitionPath) + 1; } else { rotation = 0; @@ -304,10 +296,11 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { rotationCounterMap.put(partitionPath, rotation); return new Path(this.fsUrl + this.fileNameFormat.getPath() + partitionPath, - this.fileNameFormat.getName(rotation, System.currentTimeMillis())); + this.fileNameFormat.getName(rotation, System.currentTimeMillis())); } - abstract protected void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws IOException; + abstract protected void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws + IOException; abstract protected String getWriterKey(Tuple tuple); @@ -318,7 +311,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { final OutputCollector collector; public WritersMap(long maxWriters, OutputCollector collector) { - super((int)maxWriters, 0.75f, true); + super((int) maxWriters, 0.75f, true); this.maxWriters = maxWriters; this.collector = collector; } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java index 9ab0e12..f00df3a 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java @@ -1,73 +1,67 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.bolt; -import org.apache.storm.hdfs.common.AbstractHDFSWriter; -import org.apache.storm.hdfs.common.AvroGenericRecordHDFSWriter; -import org.apache.storm.hdfs.common.Partitioner; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Tuple; +import java.io.IOException; +import java.net.URI; +import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; +import org.apache.storm.hdfs.common.AbstractHDFSWriter; +import org.apache.storm.hdfs.common.AvroGenericRecordHDFSWriter; +import org.apache.storm.hdfs.common.Partitioner; import org.apache.storm.hdfs.common.rotation.RotationAction; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URI; -import java.util.Map; -public class AvroGenericRecordBolt extends AbstractHdfsBolt{ +public class AvroGenericRecordBolt extends AbstractHdfsBolt { private static final Logger LOG = LoggerFactory.getLogger(AvroGenericRecordBolt.class); - public AvroGenericRecordBolt withFsUrl(String fsUrl){ + public AvroGenericRecordBolt withFsUrl(String fsUrl) { this.fsUrl = fsUrl; return this; } - public AvroGenericRecordBolt withConfigKey(String configKey){ + public AvroGenericRecordBolt withConfigKey(String configKey) { this.configKey = configKey; return this; } - public AvroGenericRecordBolt withFileNameFormat(FileNameFormat fileNameFormat){ + public AvroGenericRecordBolt withFileNameFormat(FileNameFormat fileNameFormat) { this.fileNameFormat = fileNameFormat; return this; } - public AvroGenericRecordBolt withSyncPolicy(SyncPolicy syncPolicy){ + public AvroGenericRecordBolt withSyncPolicy(SyncPolicy syncPolicy) { this.syncPolicy = syncPolicy; return this; } - public AvroGenericRecordBolt withRotationPolicy(FileRotationPolicy rotationPolicy){ + public AvroGenericRecordBolt withRotationPolicy(FileRotationPolicy rotationPolicy) { this.rotationPolicy = rotationPolicy; return this; } - public AvroGenericRecordBolt addRotationAction(RotationAction action){ + public AvroGenericRecordBolt addRotationAction(RotationAction action) { this.rotationActions.add(action); return this; } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java index ba8b24e..6677c7b 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java @@ -1,25 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.bolt; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Tuple; +import java.io.IOException; +import java.net.URI; +import java.util.Map; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -31,51 +26,49 @@ import org.apache.storm.hdfs.common.AbstractHDFSWriter; import org.apache.storm.hdfs.common.HDFSWriter; import org.apache.storm.hdfs.common.Partitioner; import org.apache.storm.hdfs.common.rotation.RotationAction; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URI; -import java.util.EnumSet; -import java.util.Map; - -public class HdfsBolt extends AbstractHdfsBolt{ +public class HdfsBolt extends AbstractHdfsBolt { private static final Logger LOG = LoggerFactory.getLogger(HdfsBolt.class); private transient FSDataOutputStream out; private RecordFormat format; - public HdfsBolt withFsUrl(String fsUrl){ + public HdfsBolt withFsUrl(String fsUrl) { this.fsUrl = fsUrl; return this; } - public HdfsBolt withConfigKey(String configKey){ + public HdfsBolt withConfigKey(String configKey) { this.configKey = configKey; return this; } - public HdfsBolt withFileNameFormat(FileNameFormat fileNameFormat){ + public HdfsBolt withFileNameFormat(FileNameFormat fileNameFormat) { this.fileNameFormat = fileNameFormat; return this; } - public HdfsBolt withRecordFormat(RecordFormat format){ + public HdfsBolt withRecordFormat(RecordFormat format) { this.format = format; return this; } - public HdfsBolt withSyncPolicy(SyncPolicy syncPolicy){ + public HdfsBolt withSyncPolicy(SyncPolicy syncPolicy) { this.syncPolicy = syncPolicy; return this; } - public HdfsBolt withRotationPolicy(FileRotationPolicy rotationPolicy){ + public HdfsBolt withRotationPolicy(FileRotationPolicy rotationPolicy) { this.rotationPolicy = rotationPolicy; return this; } - public HdfsBolt addRotationAction(RotationAction action){ + public HdfsBolt addRotationAction(RotationAction action) { this.rotationActions.add(action); return this; } @@ -114,6 +107,6 @@ public class HdfsBolt extends AbstractHdfsBolt{ @Override protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException { this.out = this.fs.create(path); - return new HDFSWriter(rotationPolicy,path, out, format); + return new HDFSWriter(rotationPolicy, path, out, format); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java index b2120dc..c73c6a2 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java @@ -1,25 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.bolt; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Tuple; +import java.io.IOException; +import java.net.URI; +import java.util.Map; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -32,13 +27,12 @@ import org.apache.storm.hdfs.common.AbstractHDFSWriter; import org.apache.storm.hdfs.common.Partitioner; import org.apache.storm.hdfs.common.SequenceFileWriter; import org.apache.storm.hdfs.common.rotation.RotationAction; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URI; -import java.util.Map; - public class SequenceFileBolt extends AbstractHdfsBolt { private static final Logger LOG = LoggerFactory.getLogger(SequenceFileBolt.class); @@ -52,7 +46,7 @@ public class SequenceFileBolt extends AbstractHdfsBolt { public SequenceFileBolt() { } - public SequenceFileBolt withCompressionCodec(String codec){ + public SequenceFileBolt withCompressionCodec(String codec) { this.compressionCodec = codec; return this; } @@ -62,7 +56,7 @@ public class SequenceFileBolt extends AbstractHdfsBolt { return this; } - public SequenceFileBolt withConfigKey(String configKey){ + public SequenceFileBolt withConfigKey(String configKey) { this.configKey = configKey; return this; } @@ -87,7 +81,7 @@ public class SequenceFileBolt extends AbstractHdfsBolt { return this; } - public SequenceFileBolt withCompressionType(SequenceFile.CompressionType compressionType){ + public SequenceFileBolt withCompressionType(SequenceFile.CompressionType compressionType) { this.compressionType = compressionType; return this; } @@ -97,7 +91,7 @@ public class SequenceFileBolt extends AbstractHdfsBolt { return this; } - public SequenceFileBolt addRotationAction(RotationAction action){ + public SequenceFileBolt addRotationAction(RotationAction action) { this.rotationActions.add(action); return this; } @@ -134,11 +128,11 @@ public class SequenceFileBolt extends AbstractHdfsBolt { @Override protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException { SequenceFile.Writer writer = SequenceFile.createWriter( - this.hdfsConfig, - SequenceFile.Writer.file(path), - SequenceFile.Writer.keyClass(this.format.keyClass()), - SequenceFile.Writer.valueClass(this.format.valueClass()), - SequenceFile.Writer.compression(this.compressionType, this.codecFactory.getCodecByName(this.compressionCodec)) + this.hdfsConfig, + SequenceFile.Writer.file(path), + SequenceFile.Writer.keyClass(this.format.keyClass()), + SequenceFile.Writer.valueClass(this.format.valueClass()), + SequenceFile.Writer.compression(this.compressionType, this.codecFactory.getCodecByName(this.compressionCodec)) ); return new SequenceFileWriter(this.rotationPolicy, path, writer, this.format); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/Writer.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/Writer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/Writer.java index 9312f2c..c6b3fb3 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/Writer.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/Writer.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.bolt; http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java index a3afa18..14b4513 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hdfs.bolt.format; -import org.apache.storm.task.TopologyContext; +package org.apache.storm.hdfs.bolt.format; import java.util.Map; +import org.apache.storm.task.TopologyContext; /** @@ -48,7 +42,7 @@ public class DefaultFileNameFormat implements FileNameFormat { * @param prefix * @return */ - public DefaultFileNameFormat withPrefix(String prefix){ + public DefaultFileNameFormat withPrefix(String prefix) { this.prefix = prefix; return this; } @@ -59,12 +53,12 @@ public class DefaultFileNameFormat implements FileNameFormat { * @param extension * @return */ - public DefaultFileNameFormat withExtension(String extension){ + public DefaultFileNameFormat withExtension(String extension) { this.extension = extension; return this; } - public DefaultFileNameFormat withPath(String path){ + public DefaultFileNameFormat withPath(String path) { this.path = path; return this; } @@ -77,10 +71,10 @@ public class DefaultFileNameFormat implements FileNameFormat { @Override public String getName(long rotation, long timeStamp) { - return this.prefix + this.componentId + "-" + this.taskId + "-" + rotation + "-" + timeStamp + this.extension; + return this.prefix + this.componentId + "-" + this.taskId + "-" + rotation + "-" + timeStamp + this.extension; } - public String getPath(){ + public String getPath() { return this.path; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultSequenceFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultSequenceFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultSequenceFormat.java index ab07d43..8db7ffc 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultSequenceFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultSequenceFormat.java @@ -15,17 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hdfs.bolt.format; -import org.apache.storm.tuple.Tuple; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.storm.tuple.Tuple; /** * Basic <code>SequenceFormat</code> implementation that uses * <code>LongWritable</code> for keys and <code>Text</code> for values. - * */ public class DefaultSequenceFormat implements SequenceFormat { private transient LongWritable key; @@ -34,7 +34,7 @@ public class DefaultSequenceFormat implements SequenceFormat { private String keyField; private String valueField; - public DefaultSequenceFormat(String keyField, String valueField){ + public DefaultSequenceFormat(String keyField, String valueField) { this.keyField = keyField; this.valueField = valueField; } @@ -51,8 +51,8 @@ public class DefaultSequenceFormat implements SequenceFormat { @Override public Writable key(Tuple tuple) { - if(this.key == null){ - this.key = new LongWritable(); + if (this.key == null) { + this.key = new LongWritable(); } this.key.set(tuple.getLongByField(this.keyField)); return this.key; @@ -60,7 +60,7 @@ public class DefaultSequenceFormat implements SequenceFormat { @Override public Writable value(Tuple tuple) { - if(this.value == null){ + if (this.value == null) { this.value = new Text(); } this.value.set(tuple.getStringByField(this.valueField)); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java index 0c6e3f0..e9a81cc 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.bolt.format; import org.apache.storm.tuple.Fields; @@ -44,7 +39,7 @@ public class DelimitedRecordFormat implements RecordFormat { * @param fields * @return */ - public DelimitedRecordFormat withFields(Fields fields){ + public DelimitedRecordFormat withFields(Fields fields) { this.fields = fields; return this; } @@ -55,7 +50,7 @@ public class DelimitedRecordFormat implements RecordFormat { * @param delimiter * @return */ - public DelimitedRecordFormat withFieldDelimiter(String delimiter){ + public DelimitedRecordFormat withFieldDelimiter(String delimiter) { this.fieldDelimiter = delimiter; return this; } @@ -66,7 +61,7 @@ public class DelimitedRecordFormat implements RecordFormat { * @param delimiter * @return */ - public DelimitedRecordFormat withRecordDelimiter(String delimiter){ + public DelimitedRecordFormat withRecordDelimiter(String delimiter) { this.recordDelimiter = delimiter; return this; } @@ -76,9 +71,9 @@ public class DelimitedRecordFormat implements RecordFormat { StringBuilder sb = new StringBuilder(); Fields fields = this.fields == null ? tuple.getFields() : this.fields; int size = fields.size(); - for(int i = 0; i < size; i++){ + for (int i = 0; i < size; i++) { sb.append(tuple.getValueByField(fields.get(i))); - if(i != size - 1){ + if (i != size - 1) { sb.append(this.fieldDelimiter); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java index 9ef2851..891c600 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java @@ -1,26 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hdfs.bolt.format; -import org.apache.storm.task.TopologyContext; +package org.apache.storm.hdfs.bolt.format; import java.io.Serializable; import java.util.Map; +import org.apache.storm.task.TopologyContext; /** * Formatter interface for determining HDFS file names. http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java index fe48f05..8d55c03 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java @@ -1,26 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hdfs.bolt.format; +package org.apache.storm.hdfs.bolt.format; -import org.apache.storm.tuple.Tuple; import java.io.Serializable; +import org.apache.storm.tuple.Tuple; /** * Formats a Tuple object into a byte array http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java index fcb7f45..7c38f66 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java @@ -15,15 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.hdfs.bolt.format; -import org.apache.storm.tuple.Tuple; +package org.apache.storm.hdfs.bolt.format; import java.io.Serializable; +import org.apache.storm.tuple.Tuple; /** * Interface for converting <code>Tuple</code> objects to HDFS sequence file key-value pairs. - * */ public interface SequenceFormat extends Serializable { /** @@ -35,6 +34,7 @@ public interface SequenceFormat extends Serializable { /** * Value class used by implementation (e.g. Text.class, etc.) + * * @return */ Class valueClass(); @@ -49,6 +49,7 @@ public interface SequenceFormat extends Serializable { /** * Given a tuple, return the value that should be written to the sequence file. + * * @param tuple * @return */ http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java index cb37bdc..7869d69 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java @@ -1,27 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.bolt.format; import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; - import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Utils; @@ -41,11 +35,11 @@ public class SimpleFileNameFormat implements FileNameFormat { // compile parameters SimpleDateFormat dateFormat = new SimpleDateFormat(timeFormat); String ret = name - .replace("$TIME", dateFormat.format(new Date(timeStamp))) - .replace("$NUM", String.valueOf(rotation)) - .replace("$HOST", host) - .replace("$COMPONENT", componentId) - .replace("$TASK", String.valueOf(taskId)); + .replace("$TIME", dateFormat.format(new Date(timeStamp))) + .replace("$NUM", String.valueOf(rotation)) + .replace("$HOST", host) + .replace("$COMPONENT", componentId) + .replace("$TASK", String.valueOf(taskId)); return ret; } @@ -78,7 +72,7 @@ public class SimpleFileNameFormat implements FileNameFormat { * $HOST - local host name<br/> * $COMPONENT - component id<br/> * $TASK - task id<br/> - * + * * @param name * file name * @return @@ -90,10 +84,10 @@ public class SimpleFileNameFormat implements FileNameFormat { public SimpleFileNameFormat withTimeFormat(String timeFormat) { //check format - try{ + try { new SimpleDateFormat(timeFormat); - }catch (Exception e) { - throw new IllegalArgumentException("invalid timeFormat: "+e.getMessage()); + } catch (Exception e) { + throw new IllegalArgumentException("invalid timeFormat: " + e.getMessage()); } this.timeFormat = timeFormat; return this; http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java index aeb63fa..6354ae7 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java @@ -1,26 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hdfs.bolt.rotation; +package org.apache.storm.hdfs.bolt.rotation; -import org.apache.storm.tuple.Tuple; import java.io.Serializable; +import org.apache.storm.tuple.Tuple; /** * Used by the HdfsBolt to decide when to rotate files. http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java index 5fb9bbc..230e8b4 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.bolt.rotation; @@ -36,32 +31,11 @@ import org.slf4j.LoggerFactory; */ public class FileSizeRotationPolicy implements FileRotationPolicy { private static final Logger LOG = LoggerFactory.getLogger(FileSizeRotationPolicy.class); - - public static enum Units { - - KB((long)Math.pow(2, 10)), - MB((long)Math.pow(2, 20)), - GB((long)Math.pow(2, 30)), - TB((long)Math.pow(2, 40)); - - private long byteCount; - - private Units(long byteCount){ - this.byteCount = byteCount; - } - - public long getByteCount(){ - return byteCount; - } - } - private long maxBytes; - private long lastOffset = 0; private long currentBytesWritten = 0; - - public FileSizeRotationPolicy(float count, Units units){ - this.maxBytes = (long)(count * units.getByteCount()); + public FileSizeRotationPolicy(float count, Units units) { + this.maxBytes = (long) (count * units.getByteCount()); } protected FileSizeRotationPolicy(long maxBytes) { @@ -86,4 +60,22 @@ public class FileSizeRotationPolicy implements FileRotationPolicy { public FileRotationPolicy copy() { return new FileSizeRotationPolicy(this.maxBytes); } + + public static enum Units { + + KB((long) Math.pow(2, 10)), + MB((long) Math.pow(2, 20)), + GB((long) Math.pow(2, 30)), + TB((long) Math.pow(2, 40)); + + private long byteCount; + + private Units(long byteCount) { + this.byteCount = byteCount; + } + + public long getByteCount() { + return byteCount; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java index a00037b..f25be14 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.bolt.rotation; import org.apache.storm.tuple.Tuple; http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java index 06fada8..e600831 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java @@ -15,34 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hdfs.bolt.rotation; import org.apache.storm.tuple.Tuple; public class TimedRotationPolicy implements FileRotationPolicy { - public static enum TimeUnit { - - SECONDS((long)1000), - MINUTES((long)1000*60), - HOURS((long)1000*60*60), - DAYS((long)1000*60*60*24); - - private long milliSeconds; - - private TimeUnit(long milliSeconds){ - this.milliSeconds = milliSeconds; - } - - public long getMilliSeconds(){ - return milliSeconds; - } - } - private long interval; - public TimedRotationPolicy(float count, TimeUnit units){ - this.interval = (long)(count * units.getMilliSeconds()); + public TimedRotationPolicy(float count, TimeUnit units) { + this.interval = (long) (count * units.getMilliSeconds()); } protected TimedRotationPolicy(long interval) { @@ -74,7 +57,25 @@ public class TimedRotationPolicy implements FileRotationPolicy { return new TimedRotationPolicy(this.interval); } - public long getInterval(){ + public long getInterval() { return this.interval; } + + public static enum TimeUnit { + + SECONDS((long) 1000), + MINUTES((long) 1000 * 60), + HOURS((long) 1000 * 60 * 60), + DAYS((long) 1000 * 60 * 60 * 24); + + private long milliSeconds; + + private TimeUnit(long milliSeconds) { + this.milliSeconds = milliSeconds; + } + + public long getMilliSeconds() { + return milliSeconds; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java index 9f31d58..45abc7d 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.bolt.sync; @@ -29,7 +24,7 @@ public class CountSyncPolicy implements SyncPolicy { private int count; private int executeCount = 0; - public CountSyncPolicy(int count){ + public CountSyncPolicy(int count) { this.count = count; } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/SyncPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/SyncPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/SyncPolicy.java index 19c0e25..12c673f 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/SyncPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/SyncPolicy.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hdfs.bolt.sync; -import org.apache.storm.tuple.Tuple; +package org.apache.storm.hdfs.bolt.sync; import java.io.Serializable; +import org.apache.storm.tuple.Tuple; /** * Interface for controlling when the HdfsBolt
