http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java index c69f312..ad9d7aa 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hdfs.trident.rotation; -import org.apache.storm.trident.tuple.TridentTuple; +package org.apache.storm.hdfs.trident.rotation; import java.io.Serializable; +import org.apache.storm.trident.tuple.TridentTuple; /** * Used by the HdfsBolt to decide when to rotate files. @@ -44,7 +38,7 @@ public interface FileRotationPolicy extends Serializable { /** * Check if a file rotation should be performed based on * the offset at which file is being written. - * + * * @param offset the current offset of file being written * @return true if a file rotation should be performed. */
http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java index 70865b2..2a512c4 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java @@ -1,26 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.trident.rotation; +import org.apache.storm.trident.tuple.TridentTuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.trident.tuple.TridentTuple; /** * File rotation policy that will rotate files when a certain @@ -36,32 +31,11 @@ import org.apache.storm.trident.tuple.TridentTuple; */ public class FileSizeRotationPolicy implements FileRotationPolicy { private static final Logger LOG = LoggerFactory.getLogger(FileSizeRotationPolicy.class); - - public static enum Units { - - KB((long)Math.pow(2, 10)), - MB((long)Math.pow(2, 20)), - GB((long)Math.pow(2, 30)), - TB((long)Math.pow(2, 40)); - - private long byteCount; - - private Units(long byteCount){ - this.byteCount = byteCount; - } - - public long getByteCount(){ - return byteCount; - } - } - private long maxBytes; - private long lastOffset = 0; private long currentBytesWritten = 0; - - public FileSizeRotationPolicy(float count, Units units){ - this.maxBytes = (long)(count * units.getByteCount()); + public FileSizeRotationPolicy(float count, Units units) { + this.maxBytes = (long) (count * units.getByteCount()); } @Override @@ -91,4 +65,22 @@ public class FileSizeRotationPolicy implements FileRotationPolicy { public long getMaxBytes() { return maxBytes; } + + public static enum Units { + + KB((long) Math.pow(2, 10)), + MB((long) Math.pow(2, 20)), + GB((long) Math.pow(2, 30)), + TB((long) Math.pow(2, 40)); + + private long byteCount; + + private Units(long byteCount) { + this.byteCount = byteCount; + } + + public long getByteCount() { + return byteCount; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java index 4c65acd..f6fedb9 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.trident.rotation; import org.apache.storm.trident.tuple.TridentTuple; http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java index 7278c9a..2508a07 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java @@ -15,43 +15,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.hdfs.trident.rotation; -import org.apache.storm.trident.tuple.TridentTuple; +package org.apache.storm.hdfs.trident.rotation; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.storm.trident.tuple.TridentTuple; public class TimedRotationPolicy implements FileRotationPolicy { - public static enum TimeUnit { - - SECONDS((long)1000), - MINUTES((long)1000*60), - HOURS((long)1000*60*60), - DAYS((long)1000*60*60*24); - - private long milliSeconds; - - private TimeUnit(long milliSeconds){ - this.milliSeconds = milliSeconds; - } - - public long getMilliSeconds(){ - return milliSeconds; - } - } - private long interval; private Timer rotationTimer; private AtomicBoolean rotationTimerTriggered = new AtomicBoolean(); - - - public TimedRotationPolicy(float count, TimeUnit units){ - this.interval = (long)(count * units.getMilliSeconds()); + public TimedRotationPolicy(float count, TimeUnit units) { + this.interval = (long) (count * units.getMilliSeconds()); } + /** * Called for every tuple the HdfsBolt executes. * @@ -77,7 +58,7 @@ public class TimedRotationPolicy implements FileRotationPolicy { rotationTimerTriggered.set(false); } - public long getInterval(){ + public long getInterval() { return this.interval; } @@ -95,4 +76,22 @@ public class TimedRotationPolicy implements FileRotationPolicy { }; rotationTimer.scheduleAtFixedRate(task, interval, interval); } + + public static enum TimeUnit { + + SECONDS((long) 1000), + MINUTES((long) 1000 * 60), + HOURS((long) 1000 * 60 * 60), + DAYS((long) 1000 * 60 * 60 * 24); + + private long milliSeconds; + + private TimeUnit(long milliSeconds) { + this.milliSeconds = milliSeconds; + } + + public long getMilliSeconds() { + return milliSeconds; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java index 3b4e32e..15f0c6b 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.trident.sync; @@ -29,7 +24,7 @@ public class CountSyncPolicy implements SyncPolicy { private int count; private int executeCount = 0; - public CountSyncPolicy(int count){ + public CountSyncPolicy(int count) { this.count = count; } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/SyncPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/SyncPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/SyncPolicy.java index d87da9c..ba39724 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/SyncPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/SyncPolicy.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hdfs.trident.sync; -import org.apache.storm.trident.tuple.TridentTuple; +package org.apache.storm.hdfs.trident.sync; import java.io.Serializable; +import org.apache.storm.trident.tuple.TridentTuple; /** * Interface for controlling when the HdfsBolt http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java index b588f5b..5cc7753 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java @@ -1,43 +1,41 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.avro; import org.apache.avro.Schema; import org.junit.Assert; -import org.junit.Test; - import org.junit.BeforeClass; +import org.junit.Test; public class TestFixedAvroSerializer { //These should match FixedAvroSerializer.config in the test resources private static final String schemaString1 = "{\"type\":\"record\"," + - "\"name\":\"stormtest1\"," + - "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + - "{ \"name\":\"int1\", \"type\":\"int\" }]}"; + "\"name\":\"stormtest1\"," + + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; private static final String schemaString2 = "{\"type\":\"record\"," + - "\"name\":\"stormtest2\"," + - "\"fields\":[{\"name\":\"foobar1\",\"type\":\"string\"}," + - "{ \"name\":\"intint1\", \"type\":\"int\" }]}"; + "\"name\":\"stormtest2\"," + + "\"fields\":[{\"name\":\"foobar1\",\"type\":\"string\"}," + + "{ \"name\":\"intint1\", \"type\":\"int\" }]}"; private static Schema schema1; private static Schema schema2; final AvroSchemaRegistry reg; + public TestFixedAvroSerializer() throws Exception { + reg = new FixedAvroSerializer(); + } + @BeforeClass public static void setupClass() { @@ -48,17 +46,13 @@ public class TestFixedAvroSerializer { schema2 = parser.parse(schemaString2); } - public TestFixedAvroSerializer() throws Exception{ - reg = new FixedAvroSerializer(); - } - @Test public void testSchemas() { testTheSchema(schema1); testTheSchema(schema2); } - @Test + @Test public void testDifferentFPs() { String fp1 = reg.getFingerprint(schema1); String fp2 = reg.getFingerprint(schema2); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java index fb97782..cf02dcb 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.avro; import org.apache.avro.Schema; @@ -24,13 +19,13 @@ import org.junit.Test; public class TestGenericAvroSerializer { private static final String schemaString1 = "{\"type\":\"record\"," + - "\"name\":\"stormtest1\"," + - "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + - "{ \"name\":\"int1\", \"type\":\"int\" }]}"; + "\"name\":\"stormtest1\"," + + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; private static final String schemaString2 = "{\"type\":\"record\"," + - "\"name\":\"stormtest2\"," + - "\"fields\":[{\"name\":\"foobar1\",\"type\":\"string\"}," + - "{ \"name\":\"intint1\", \"type\":\"int\" }]}"; + "\"name\":\"stormtest2\"," + + "\"fields\":[{\"name\":\"foobar1\",\"type\":\"string\"}," + + "{ \"name\":\"intint1\", \"type\":\"int\" }]}"; private static Schema schema1; private static Schema schema2; @@ -52,7 +47,7 @@ public class TestGenericAvroSerializer { testTheSchema(schema2); } - @Test + @Test public void testDifferentFPs() { String fp1 = reg.getFingerprint(schema1); String fp2 = reg.getFingerprint(schema2); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java index cac55c9..a919c92 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java @@ -1,72 +1,77 @@ - /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.bolt; -import org.apache.storm.Config; -import org.apache.storm.task.GeneralTopologyContext; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.TupleImpl; -import org.apache.storm.tuple.Values; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.HashMap; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.io.DatumReader; -import org.apache.avro.file.DataFileReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.storm.Config; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; -import org.junit.Before; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; +import org.apache.storm.task.GeneralTopologyContext; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; import org.junit.After; -import org.junit.Test; import org.junit.Assert; - -import org.mockito.Mock; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FSDataInputStream; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.HashMap; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.storm.hdfs.testing.MiniDFSClusterRule; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; +import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class AvroGenericRecordBoltTest { + private static final String testRoot = "/unittest"; + private static final String schemaV1 = "{\"type\":\"record\"," + + "\"name\":\"myrecord\"," + + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; + private static final String schemaV2 = "{\"type\":\"record\"," + + "\"name\":\"myrecord\"," + + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + + "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" }," + + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; + private static Schema schema1; + private static Schema schema2; + private static Tuple tuple1; + private static Tuple tuple2; @Rule public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(() -> { Configuration conf = new Configuration(); @@ -81,24 +86,8 @@ public class AvroGenericRecordBoltTest { private OutputCollector collector; @Mock private TopologyContext topologyContext; - private DistributedFileSystem fs; private String hdfsURI; - private static final String testRoot = "/unittest"; - private static Schema schema1; - private static Schema schema2; - private static Tuple tuple1; - private static Tuple tuple2; - private static final String schemaV1 = "{\"type\":\"record\"," - + "\"name\":\"myrecord\"," - + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," - + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; - - private static final String schemaV2 = "{\"type\":\"record\"," - + "\"name\":\"myrecord\"," - + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," - + "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" }," - + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; @BeforeClass public static void setupClass() { @@ -119,6 +108,18 @@ public class AvroGenericRecordBoltTest { tuple2 = generateTestTuple(builder2.build()); } + private static Tuple generateTestTuple(GenericRecord record) { + TopologyBuilder builder = new TopologyBuilder(); + GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), + new Config(), new HashMap(), new HashMap(), new HashMap(), "") { + @Override + public Fields getComponentOutputFields(String componentId, String streamId) { + return new Fields("record"); + } + }; + return new TupleImpl(topologyContext, new Values(record), topologyContext.getComponentId(1), 1, ""); + } + @Before public void setup() throws Exception { fs = dfsClusterRule.getDfscluster().getFileSystem(); @@ -220,18 +221,6 @@ public class AvroGenericRecordBoltTest { .withSyncPolicy(fieldsSyncPolicy); } - private static Tuple generateTestTuple(GenericRecord record) { - TopologyBuilder builder = new TopologyBuilder(); - GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), - new Config(), new HashMap(), new HashMap(), new HashMap(), "") { - @Override - public Fields getComponentOutputFields(String componentId, String streamId) { - return new Fields("record"); - } - }; - return new TupleImpl(topologyContext, new Values(record), topologyContext.getComponentId(1), 1, ""); - } - private void verifyAllAvroFiles(String path) throws IOException { Path p = new Path(path); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java index b872029..32844e7 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java @@ -1,32 +1,29 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.bolt; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.storm.Config; -import org.apache.storm.task.GeneralTopologyContext; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.TupleImpl; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.MockTupleHelpers; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; @@ -36,37 +33,33 @@ import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import org.apache.storm.hdfs.common.Partitioner; -import org.junit.Before; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; +import org.apache.storm.task.GeneralTopologyContext; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.MockTupleHelpers; import org.junit.After; +import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.Assert; - import org.junit.rules.ExpectedException; -import org.mockito.Mock; - -import static org.mockito.Mockito.*; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import org.apache.storm.hdfs.testing.MiniDFSClusterRule; import org.junit.runner.RunWith; -import org.mockito.Mockito; +import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; + @RunWith(MockitoJUnitRunner.class) public class TestHdfsBolt { + private static final String testRoot = "/unittest"; @Rule public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(() -> { Configuration conf = new Configuration(); @@ -77,16 +70,16 @@ public class TestHdfsBolt { conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); return conf; }); - - private String hdfsURI; - private DistributedFileSystem fs; - private static final String testRoot = "/unittest"; + @Rule + public ExpectedException thrown = ExpectedException.none(); Tuple tuple1 = generateTestTuple(1, "First Tuple", "SFO", "CA"); Tuple tuple2 = generateTestTuple(1, "Second Tuple", "SJO", "CA"); - - @Mock private OutputCollector collector; - @Mock private TopologyContext topologyContext; - @Rule public ExpectedException thrown = ExpectedException.none(); + private String hdfsURI; + private DistributedFileSystem fs; + @Mock + private OutputCollector collector; + @Mock + private TopologyContext topologyContext; @Before public void setup() throws Exception { @@ -138,8 +131,7 @@ public class TestHdfsBolt { } @Test - public void testTwoTuplesOneFile() throws IOException - { + public void testTwoTuplesOneFile() throws IOException { HdfsBolt bolt = makeHdfsBolt(hdfsURI, 2, 10000f); bolt.prepare(new Config(), topologyContext, collector); bolt.execute(tuple1); @@ -154,8 +146,7 @@ public class TestHdfsBolt { } @Test - public void testFailedSync() throws IOException - { + public void testFailedSync() throws IOException { HdfsBolt bolt = makeHdfsBolt(hdfsURI, 2, 10000f); bolt.prepare(new Config(), topologyContext, collector); bolt.execute(tuple1); @@ -171,27 +162,23 @@ public class TestHdfsBolt { // One tuple and one rotation should yield one file with data // The failed executions should not cause rotations and any new files @Test - public void testFailureFilecount() throws IOException, InterruptedException - { + public void testFailureFilecount() throws IOException, InterruptedException { HdfsBolt bolt = makeHdfsBolt(hdfsURI, 1, .000001f); bolt.prepare(new Config(), topologyContext, collector); bolt.execute(tuple1); fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); - try - { + try { bolt.execute(tuple2); } catch (RuntimeException e) { // } - try - { + try { bolt.execute(tuple2); } catch (RuntimeException e) { // } - try - { + try { bolt.execute(tuple2); } catch (RuntimeException e) { // @@ -202,8 +189,7 @@ public class TestHdfsBolt { } @Test - public void testTickTuples() throws IOException - { + public void testTickTuples() throws IOException { HdfsBolt bolt = makeHdfsBolt(hdfsURI, 10, 10000f); bolt.prepare(new Config(), topologyContext, collector); @@ -230,28 +216,29 @@ public class TestHdfsBolt { SyncPolicy fieldsSyncPolicy = new CountSyncPolicy(countSync); FileRotationPolicy fieldsRotationPolicy = - new FileSizeRotationPolicy(rotationSizeMB, FileSizeRotationPolicy.Units.MB); + new FileSizeRotationPolicy(rotationSizeMB, FileSizeRotationPolicy.Units.MB); FileNameFormat fieldsFileNameFormat = new DefaultFileNameFormat().withPath(testRoot); return new HdfsBolt() - .withFsUrl(nameNodeAddr) - .withFileNameFormat(fieldsFileNameFormat) - .withRecordFormat(fieldsFormat) - .withRotationPolicy(fieldsRotationPolicy) - .withSyncPolicy(fieldsSyncPolicy); + .withFsUrl(nameNodeAddr) + .withFileNameFormat(fieldsFileNameFormat) + .withRecordFormat(fieldsFormat) + .withRotationPolicy(fieldsRotationPolicy) + .withSyncPolicy(fieldsSyncPolicy); } - private Tuple generateTestTuple(Object id, Object msg,Object city,Object state) { + private Tuple generateTestTuple(Object id, Object msg, Object city, Object state) { TopologyBuilder builder = new TopologyBuilder(); GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), - new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(), "") { + new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(), + "") { @Override public Fields getComponentOutputFields(String componentId, String streamId) { - return new Fields("id", "msg","city","state"); + return new Fields("id", "msg", "city", "state"); } }; - return new TupleImpl(topologyContext, new Values(id, msg,city,state), topologyContext.getComponentId(1), 1, ""); + return new TupleImpl(topologyContext, new Values(id, msg, city, state), topologyContext.getComponentId(1), 1, ""); } // Generally used to compare how files were actually written and compare to expectations based on total http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java index 71f5d4e..901f597 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java @@ -1,26 +1,37 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hdfs.bolt; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; +package org.apache.storm.hdfs.bolt; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.storm.Config; +import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; +import org.apache.storm.hdfs.bolt.format.DefaultSequenceFormat; +import org.apache.storm.hdfs.bolt.format.FileNameFormat; +import org.apache.storm.hdfs.bolt.format.SequenceFormat; +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; +import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; +import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; +import org.apache.storm.hdfs.bolt.sync.SyncPolicy; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -29,44 +40,29 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; -import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; -import org.apache.storm.hdfs.bolt.format.FileNameFormat; -import org.apache.storm.hdfs.bolt.format.SequenceFormat; -import org.apache.storm.hdfs.bolt.format.DefaultSequenceFormat; -import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; -import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; -import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; -import org.apache.storm.hdfs.bolt.sync.SyncPolicy; -import org.junit.*; - +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; import org.mockito.Mock; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.mockito.runners.MockitoJUnitRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import org.apache.storm.hdfs.testing.MiniDFSClusterRule; -import org.junit.runner.RunWith; -import org.mockito.runners.MockitoJUnitRunner; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; @RunWith(MockitoJUnitRunner.class) public class TestSequenceFileBolt { private static final Logger LOG = LoggerFactory.getLogger(TestSequenceFileBolt.class); - + private static final String testRoot = "/unittest"; @Rule public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(() -> { - Configuration conf = new Configuration(); + Configuration conf = new Configuration(); conf.set("fs.trash.interval", "10"); conf.setBoolean("dfs.permissions", true); File baseDir = new File("./target/hdfs/").getAbsoluteFile(); @@ -74,16 +70,16 @@ public class TestSequenceFileBolt { conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); return conf; }); - - private String hdfsURI; - private DistributedFileSystem fs; - private static final String testRoot = "/unittest"; + @Rule + public ExpectedException thrown = ExpectedException.none(); Tuple tuple1 = generateTestTuple(1l, "first tuple"); Tuple tuple2 = generateTestTuple(2l, "second tuple"); - - @Mock private OutputCollector collector; - @Mock private TopologyContext topologyContext; - @Rule public ExpectedException thrown = ExpectedException.none(); + private String hdfsURI; + private DistributedFileSystem fs; + @Mock + private OutputCollector collector; + @Mock + private TopologyContext topologyContext; @Before public void setup() throws Exception { @@ -111,8 +107,7 @@ public class TestSequenceFileBolt { } @Test - public void testTwoTuplesOneFile() throws IOException - { + public void testTwoTuplesOneFile() throws IOException { SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 2, 10000f); bolt.prepare(new Config(), topologyContext, collector); bolt.execute(tuple1); @@ -127,8 +122,7 @@ public class TestSequenceFileBolt { } @Test - public void testFailedSync() throws IOException - { + public void testFailedSync() throws IOException { SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 2, 10000f); bolt.prepare(new Config(), topologyContext, collector); bolt.execute(tuple1); @@ -144,24 +138,25 @@ public class TestSequenceFileBolt { SyncPolicy fieldsSyncPolicy = new CountSyncPolicy(countSync); FileRotationPolicy fieldsRotationPolicy = - new FileSizeRotationPolicy(rotationSizeMB, FileSizeRotationPolicy.Units.MB); + new FileSizeRotationPolicy(rotationSizeMB, FileSizeRotationPolicy.Units.MB); FileNameFormat fieldsFileNameFormat = new DefaultFileNameFormat().withPath(testRoot); SequenceFormat seqFormat = new DefaultSequenceFormat("key", "value"); return new SequenceFileBolt() - .withFsUrl(nameNodeAddr) - .withFileNameFormat(fieldsFileNameFormat) - .withRotationPolicy(fieldsRotationPolicy) - .withSequenceFormat(seqFormat) - .withSyncPolicy(fieldsSyncPolicy); + .withFsUrl(nameNodeAddr) + .withFileNameFormat(fieldsFileNameFormat) + .withRotationPolicy(fieldsRotationPolicy) + .withSequenceFormat(seqFormat) + .withSyncPolicy(fieldsSyncPolicy); } private Tuple generateTestTuple(Long key, String value) { TopologyBuilder builder = new TopologyBuilder(); GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), - new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(), "") { + new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(), + "") { @Override public Fields getComponentOutputFields(String componentId, String streamId) { return new Fields("key", "value"); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java index 9554aba..2fecbe0 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.hdfs.bolt; @@ -34,8 +28,8 @@ public class TestWritersMap { AbstractHDFSWriterMock bar = new AbstractHDFSWriterMock(new FileSizeRotationPolicy(1, FileSizeRotationPolicy.Units.KB), null); AbstractHDFSWriterMock baz = new AbstractHDFSWriterMock(new FileSizeRotationPolicy(1, FileSizeRotationPolicy.Units.KB), null); - @Test public void testLRUBehavior() - { + @Test + public void testLRUBehavior() { map.put("FOO", foo); map.put("BAR", bar); http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java index 64f5dd8..fd586db 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java @@ -1,27 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.hdfs.bolt.format; import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.Map; - import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Utils; import org.junit.Assert; @@ -40,7 +34,7 @@ public class TestSimpleFileNameFormat { Assert.assertEquals("/storm", path); String time = new SimpleDateFormat("yyyyMMddHHmmss").format(now); - Assert.assertEquals(time+".1.txt", name); + Assert.assertEquals(time + ".1.txt", name); } @Test @@ -54,7 +48,7 @@ public class TestSimpleFileNameFormat { long now = System.currentTimeMillis(); String path = format.getPath(); String name = format.getName(1, now); - + Assert.assertEquals("/mypath", path); String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(now); String host = null; @@ -63,20 +57,21 @@ public class TestSimpleFileNameFormat { } catch (UnknownHostException e) { e.printStackTrace(); } - Assert.assertEquals(time+"."+host+".Xcom.7.1.txt", name); + Assert.assertEquals(time + "." + host + ".Xcom.7.1.txt", name); } - @Test(expected=IllegalArgumentException.class) + @Test(expected = IllegalArgumentException.class) public void testTimeFormat() { Map<String, Object> topoConf = new HashMap(); SimpleFileNameFormat format = new SimpleFileNameFormat() - .withTimeFormat("xyz"); + .withTimeFormat("xyz"); format.prepare(null, createTopologyContext(topoConf)); } - - private TopologyContext createTopologyContext(Map<String, Object> topoConf){ - Map<Integer, String> taskToComponent = new HashMap<Integer, String>(); + + private TopologyContext createTopologyContext(Map<String, Object> topoConf) { + Map<Integer, String> taskToComponent = new HashMap<Integer, String>(); taskToComponent.put(7, "Xcom"); - return new TopologyContext(null, topoConf, taskToComponent, null, null, null, null, null, null, 7, 6703, null, null, null, null, null, null); + return new TopologyContext(null, topoConf, taskToComponent, null, null, null, null, null, null, 7, 6703, null, null, null, null, + null, null); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/ConfigsTest.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/ConfigsTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/ConfigsTest.java index 6244466..c8c7620 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/ConfigsTest.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/ConfigsTest.java @@ -1,32 +1,37 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.hdfs.spout; -import static org.junit.Assert.*; +package org.apache.storm.hdfs.spout; import java.util.HashMap; import java.util.Map; - import org.apache.storm.validation.ConfigValidation; import org.junit.Test; +import static org.junit.Assert.fail; + public class ConfigsTest { + public static void verifyBad(String key, Object value) { + Map<String, Object> conf = new HashMap<>(); + conf.put(key, value); + try { + ConfigValidation.validateFields(conf); + fail("Expected " + key + " = " + value + " to throw Exception, but it didn't"); + } catch (IllegalArgumentException e) { + //good + } + } + @SuppressWarnings("deprecation") @Test public void testGood() { @@ -67,17 +72,6 @@ public class ConfigsTest { ConfigValidation.validateFields(conf); } - public static void verifyBad(String key, Object value) { - Map<String, Object> conf = new HashMap<>(); - conf.put(key, value); - try { - ConfigValidation.validateFields(conf); - fail("Expected "+key+" = "+ value + " to throw Exception, but it didn't"); - } catch (IllegalArgumentException e) { - //good - } - } - @SuppressWarnings("deprecation") @Test public void testBad() { http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java index 4016c55..46f7892 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java @@ -1,12 +1,7 @@ - /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> @@ -14,31 +9,29 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions * and limitations under the License. */ + package org.apache.storm.hdfs.spout; +import java.io.IOException; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import org.apache.storm.hdfs.testing.MiniDFSClusterRule; import org.junit.Rule; +import org.junit.Test; public class TestDirLock { + private static final int LOCK_EXPIRY_SEC = 1; + private final Path locksDir = new Path("/tmp/lockdir"); @Rule public MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule(); - - private static final int LOCK_EXPIRY_SEC = 1; - private FileSystem fs; private HdfsConfiguration conf = new HdfsConfiguration(); - private final Path locksDir = new Path("/tmp/lockdir"); @Before public void setUp() throws IOException { @@ -92,7 +85,7 @@ public class TestDirLock { for (DirLockingThread thread : threads) { thread.interrupt(); thread.join(30_000); - if(thread.isAlive()) { + if (thread.isAlive()) { throw new RuntimeException("Failed to stop threads within 30 seconds, threads may leak into other tests"); } } @@ -134,10 +127,10 @@ public class TestDirLock { class DirLockingThread extends Thread { - private int thdNum; private final FileSystem fs; private final Path dir; public boolean cleanExit = false; + private int thdNum; public DirLockingThread(int thdNum, FileSystem fs, Path dir) throws IOException { http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java index df131a4..1a30bfa 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java @@ -1,12 +1,7 @@ - /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> @@ -14,38 +9,42 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions * and limitations under the License. */ + package org.apache.storm.hdfs.spout; +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Method; +import java.util.ArrayList; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.storm.hdfs.common.HdfsUtils; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Test; - -import java.io.BufferedReader; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.lang.reflect.Method; -import java.util.ArrayList; -import org.apache.storm.hdfs.testing.MiniDFSClusterRule; import org.junit.Rule; +import org.junit.Test; public class TestFileLock { + private final Path filesDir = new Path("/tmp/filesdir"); + private final Path locksDir = new Path("/tmp/locksdir"); @Rule public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(); - private FileSystem fs; private HdfsConfiguration conf = new HdfsConfiguration(); - private final Path filesDir = new Path("/tmp/filesdir"); - private final Path locksDir = new Path("/tmp/locksdir"); + public static void closeUnderlyingLockFile(FileLock lock) throws ReflectiveOperationException { + Method m = FileLock.class.getDeclaredMethod("forceCloseLockFile"); + m.setAccessible(true); + m.invoke(lock); + } @Before public void setup() throws IOException { @@ -307,12 +306,6 @@ public class TestFileLock { } } - public static void closeUnderlyingLockFile(FileLock lock) throws ReflectiveOperationException { - Method m = FileLock.class.getDeclaredMethod("forceCloseLockFile"); - m.setAccessible(true); - m.invoke(lock); - } - /** * return null if file not found */ @@ -340,9 +333,9 @@ public class TestFileLock { class FileLockingThread extends Thread { - private int thdNum; private final FileSystem fs; public boolean cleanExit = false; + private int thdNum; private Path fileToLock; private Path locksDir; private String spoutId; http://git-wip-us.apache.org/repos/asf/storm/blob/7da98cf0/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java index 8395d27..1a278c1 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java @@ -1,12 +1,7 @@ - /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> @@ -14,12 +9,10 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions * and limitations under the License. */ -package org.apache.storm.hdfs.spout; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.notNull; +package org.apache.storm.hdfs.spout; +import java.io.IOException; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -28,24 +21,24 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.ipc.RemoteException; +import org.apache.storm.hdfs.testing.MiniDFSClusterRule; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; -import java.io.IOException; -import org.apache.storm.hdfs.testing.MiniDFSClusterRule; -import org.junit.Rule; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.notNull; public class TestHdfsSemantics { + private final HdfsConfiguration conf = new HdfsConfiguration(); + private final Path dir = new Path("/tmp/filesdir"); @Rule public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(); - private FileSystem fs; - private final HdfsConfiguration conf = new HdfsConfiguration(); - - private final Path dir = new Path("/tmp/filesdir"); @Before public void setup() throws IOException { @@ -63,7 +56,7 @@ public class TestHdfsSemantics { @Test public void testDeleteSemantics() throws Exception { Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1"); -// try { + // try { // 1) Delete absent file - should return false Assert.assertFalse(fs.exists(file)); try { @@ -130,7 +123,7 @@ public class TestHdfsSemantics { } //2 try to append to a closed file - try (FSDataOutputStream os2 = fs.append(file1)) { + try (FSDataOutputStream os2 = fs.append(file1)) { assertThat(os2, notNull()); } }
