http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStorageTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStorageTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStorageTest.java new file mode 100644 index 0000000..d0c27f7 --- /dev/null +++ b/flume/src/test/java/org/apache/apex/malhar/flume/storage/HDFSStorageTest.java @@ -0,0 +1,695 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.storage; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.flume.Context; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.netlet.util.Slice; + +/** + * + */ +public class HDFSStorageTest +{ + public static class TestMeta extends TestWatcher + { + public String baseDir; + public String testFile; + private String testData = "No and yes. There is also IdleTimeHandler that allows the operator to emit tuples. " + + "There is overlap, why not have a single interface. \n" + + "Also consider the possibility of an operator that does other processing and not consume nor emit tuples,"; + + @Override + protected void starting(org.junit.runner.Description description) + { + String className = description.getClassName(); + baseDir = "target/" + className; + try { + baseDir = (new File(baseDir)).getAbsolutePath(); + FileUtils.forceMkdir(new File(baseDir)); + testFile = baseDir + "/testInput.txt"; + FileOutputStream outputStream = FileUtils.openOutputStream(new File(testFile)); + outputStream.write(testData.getBytes()); + outputStream.close(); + + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + protected void finished(Description description) + { + try { + FileUtils.deleteDirectory(new File(baseDir)); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + private String STORAGE_DIRECTORY; + + private HDFSStorage getStorage(String id, boolean restore) + { + Context ctx = new Context(); + STORAGE_DIRECTORY = testMeta.baseDir; + ctx.put(HDFSStorage.BASE_DIR_KEY, testMeta.baseDir); + ctx.put(HDFSStorage.RESTORE_KEY, Boolean.toString(restore)); + ctx.put(HDFSStorage.ID, id); + ctx.put(HDFSStorage.BLOCKSIZE, "256"); + HDFSStorage lstorage = new HDFSStorage(); + lstorage.configure(ctx); + lstorage.setup(null); + return lstorage; + } + + private HDFSStorage storage; + + @Before + public void setup() + { + storage = getStorage("1", false); + } + + @After + public void teardown() + { + storage.teardown(); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + storage.cleanHelperFiles(); + } + + /** + * This test covers following use case 1. Some data is stored 2. File is flush but the file is not close 3. Some more + * data is stored but the file doesn't roll-overs 4. Retrieve is called for the last returned address and it return + * nulls 5. Some more data is stored again but the address is returned null because of previous retrieve call + * + * @throws Exception + */ + @Test + public void testPartialFlush() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = "ab".getBytes(); + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + b = "cb".getBytes(); + byte[] addr = storage.store(new Slice(b, 0, b.length)); + match(storage.retrieve(new byte[8]), "ab"); + Assert.assertNull(storage.retrieve(addr)); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + storage.flush(); + match(storage.retrieve(address), "cb"); + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + } + + /** + * This test covers following use case 1. Some data is stored to make sure that there is no roll over 2. File is + * flushed but the file is not closed 3. Some more data is stored. The data stored is enough to make the file roll + * over 4. Retrieve is called for the last returned address and it return nulls as the data is not flushed 5. Some + * more data is stored again but the address is returned null because of previous retrieve call 6. The data is flushed + * to make sure that the data is committed. 7. Now the data is retrieved from the starting and data returned matches + * the data stored + * + * @throws Exception + */ + @Test + public void testPartialFlushRollOver() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52, + 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49, + 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54, + 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53, + 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1, + 48, 46, 48, 1, 48, 46, 48}; + byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, + 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, + 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, + 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, + 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, + 1, 48, 46, 48, 1, 48, 46, 48}; + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + byte[] addr = null; + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + addr = storage.store(new Slice(b, 0, b.length)); + } + Assert.assertNull(storage.retrieve(addr)); + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + } + storage.flush(); + match(storage.retrieve(new byte[8]), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieve(address), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + + } + + /** + * This test covers following use case 1. Some data is stored to make sure that there is no roll over 2. File is + * flushed but the file is not closed 3. Some more data is stored. The data stored is enough to make the file roll + * over 4. The storage crashes and new storage is instiated. 5. Retrieve is called for the last returned address and + * it return nulls as the data is not flushed 6. Some more data is stored again but the address is returned null + * because of previous retrieve call 7. The data is flushed to make sure that the data is committed. 8. Now the data + * is retrieved from the starting and data returned matches the data stored + * + * @throws Exception + */ + @Test + public void testPartialFlushRollOverWithFailure() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52, + 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49, + 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54, + 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53, + 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1, + 48, 46, 48, 1, 48, 46, 48}; + byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, + 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, + 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, + 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, + 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, + 1, 48, 46, 48, 1, 48, 46, 48}; + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + byte[] addr = null; + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + addr = storage.store(new Slice(b, 0, b.length)); + } + storage = getStorage("1", true); + Assert.assertNull(storage.retrieve(addr)); + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + } + storage.flush(); + match(storage.retrieve(new byte[8]), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieve(address), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + + } + + /** + * This tests clean when the file doesn't roll over + * + * @throws Exception + */ + @Test + public void testPartialFlushWithClean() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = "ab".getBytes(); + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + storage.clean(address); + b = "cb".getBytes(); + byte[] addr = storage.store(new Slice(b, 0, b.length)); + Assert.assertNull(storage.retrieve(addr)); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + storage.flush(); + match(storage.retrieve(new byte[8]), "cb"); + match(storage.retrieve(address), "cb"); + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + } + + /** + * This tests clean when the file doesn't roll over + * + * @throws Exception + */ + @Test + public void testPartialFlushWithCleanAndFailure() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = "ab".getBytes(); + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + storage.clean(address); + b = "cb".getBytes(); + byte[] addr = storage.store(new Slice(b, 0, b.length)); + storage = getStorage("1", true); + Assert.assertNull(storage.retrieve(addr)); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + storage.flush(); + match(storage.retrieve(new byte[8]), "cb"); + match(storage.retrieve(address), "cb"); + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + } + + /** + * This test covers following use case 1. Some data is stored to make sure that there is no roll over 2. File is + * flushed but the file is not closed 3. The data is cleaned till the last returned address 4. Some more data is + * stored. The data stored is enough to make the file roll over 5. Retrieve is called for the last returned address + * and it return nulls as the data is not flushed 6. Some more data is stored again but the address is returned null + * because of previous retrieve call 7. The data is flushed to make sure that the data is committed. 8. Now the data + * is retrieved from the starting and data returned matches the data stored + * + * @throws Exception + */ + @Test + public void testPartialFlushWithCleanAndRollOver() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52, + 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49, + 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54, + 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53, + 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1, + 48, 46, 48, 1, 48, 46, 48}; + byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, + 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, + 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, + 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, + 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, + 1, 48, 46, 48, 1, 48, 46, 48}; + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + storage.clean(address); + + byte[] addr = null; + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + addr = storage.store(new Slice(b, 0, b.length)); + } + Assert.assertNull(storage.retrieve(addr)); + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + } + storage.flush(); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieve(new byte[8]), new String(b_org)); + match(storage.retrieve(address), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + + } + + /** + * This tests the clean when the files are roll-over and the storage fails + * + * @throws Exception + */ + @Test + public void testPartialFlushWithCleanAndRollOverAndFailure() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, 52, + 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, 49, + 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, 54, + 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, 53, + 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, 1, + 48, 46, 48, 1, 48, 46, 48}; + byte[] b_org = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, + 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, + 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, + 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, + 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, + 1, 48, 46, 48, 1, 48, 46, 48}; + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + storage.clean(address); + byte[] addr = null; + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + addr = storage.store(new Slice(b, 0, b.length)); + } + storage = getStorage("1", true); + Assert.assertNull(storage.retrieve(addr)); + for (int i = 0; i < 5; i++) { + b[0] = (byte)(b[0] + 1); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + } + storage.flush(); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieve(address), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + b_org[0] = (byte)(b_org[0] + 1); + match(storage.retrieveNext(), new String(b_org)); + + } + + /** + * This test covers following use case The file is flushed and then more data is written to the same file, but the new + * data is not flushed and file is not roll over and storage fails The new storage comes up and client asks for data + * at the last returned address from earlier storage instance. The new storage returns null. Client stores the data + * again but the address returned this time is null and the retrieval of the earlier address now returns data + * + * @throws Exception + */ + @Test + public void testPartialFlushWithFailure() throws Exception + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = "ab".getBytes(); + byte[] address = storage.store(new Slice(b, 0, b.length)); + Assert.assertNotNull(address); + storage.flush(); + b = "cb".getBytes(); + byte[] addr = storage.store(new Slice(b, 0, b.length)); + storage = getStorage("1", true); + Assert.assertNull(storage.retrieve(addr)); + Assert.assertNull(storage.store(new Slice(b, 0, b.length))); + storage.flush(); + match(storage.retrieve(address), "cb"); + } + + private void match(byte[] data, String match) + { + byte[] tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + Assert.assertEquals("matched the stored value with retrieved value", match, new String(tempData)); + } + + @Test + public void testStorage() throws IOException + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = new byte[200]; + byte[] identifier; + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + Assert.assertNull(storage.retrieve(new byte[8])); + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + storage.flush(); + byte[] data = storage.retrieve(new byte[8]); + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + identifier = storage.store(new Slice(b, 0, b.length)); + byte[] tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData)); + Assert.assertNull(storage.retrieve(identifier)); + } + + @Test + public void testStorageWithRestore() throws IOException + { + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = new byte[200]; + Assert.assertNotNull(storage.store(new Slice(b, 0, b.length))); + storage.flush(); + storage.teardown(); + + storage = getStorage("1", true); + storage.store(new Slice(b, 0, b.length)); + storage.flush(); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + boolean exists = fs.exists(new Path(STORAGE_DIRECTORY + "/1/" + "1")); + Assert.assertEquals("file should exist", true, exists); + } + + @Test + public void testCleanup() throws IOException + { + RandomAccessFile r = new RandomAccessFile(testMeta.testFile, "r"); + r.seek(0); + byte[] b = r.readLine().getBytes(); + storage.store(new Slice(b, 0, b.length)); + byte[] val = storage.store(new Slice(b, 0, b.length)); + storage.flush(); + storage.clean(val); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + boolean exists = fs.exists(new Path(STORAGE_DIRECTORY + "/" + "0")); + Assert.assertEquals("file should not exist", false, exists); + r.close(); + } + + @Test + public void testNext() throws IOException + { + RandomAccessFile r = new RandomAccessFile(testMeta.testFile, "r"); + r.seek(0); + Assert.assertNull(storage.retrieve(new byte[8])); + byte[] b = r.readLine().getBytes(); + storage.store(new Slice(b, 0, b.length)); + byte[] b1 = r.readLine().getBytes(); + storage.store(new Slice(b1, 0, b1.length)); + storage.store(new Slice(b, 0, b.length)); + storage.flush(); + storage.store(new Slice(b1, 0, b1.length)); + storage.store(new Slice(b, 0, b.length)); + storage.flush(); + byte[] data = storage.retrieve(new byte[8]); + byte[] tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData)); + data = storage.retrieveNext(); + tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + Assert.assertEquals("matched the stored value with retrieved value", new String(b1), new String(tempData)); + data = storage.retrieveNext(); + tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData)); + r.close(); + } + + @Test + public void testFailure() throws IOException + { + byte[] address; + byte[] b = new byte[200]; + storage.retrieve(new byte[8]); + for (int i = 0; i < 5; i++) { + storage.store(new Slice(b, 0, b.length)); + address = storage.store(new Slice(b, 0, b.length)); + storage.flush(); + storage.clean(address); + } + storage.teardown(); + + byte[] identifier = new byte[8]; + storage = getStorage("1", true); + + storage.retrieve(identifier); + + storage.store(new Slice(b, 0, b.length)); + storage.store(new Slice(b, 0, b.length)); + storage.store(new Slice(b, 0, b.length)); + storage.flush(); + byte[] data = storage.retrieve(identifier); + byte[] tempData = new byte[data.length - 8]; + System.arraycopy(data, 8, tempData, 0, tempData.length); + Assert.assertEquals("matched the stored value with retrieved value", new String(b), new String(tempData)); + } + + /** + * This test case tests the clean call before any flush is called. + * + * @throws IOException + */ + @Test + public void testCleanUnflushedData() throws IOException + { + for (int i = 0; i < 5; i++) { + final byte[] bytes = (i + "").getBytes(); + storage.store(new Slice(bytes, 0, bytes.length)); + } + storage.clean(new byte[8]); + storage.flush(); + match(storage.retrieve(new byte[8]), "0"); + match(storage.retrieveNext(), "1"); + } + + @Test + public void testCleanForUnflushedData() throws IOException + { + byte[] address = null; + byte[] b = new byte[200]; + storage.retrieve(new byte[8]); + for (int i = 0; i < 5; i++) { + storage.store(new Slice(b, 0, b.length)); + address = storage.store(new Slice(b, 0, b.length)); + storage.flush(); + // storage.clean(address); + } + byte[] lastWrittenAddress = null; + for (int i = 0; i < 5; i++) { + storage.store(new Slice(b, 0, b.length)); + lastWrittenAddress = storage.store(new Slice(b, 0, b.length)); + } + storage.clean(lastWrittenAddress); + byte[] cleanedOffset = storage.readData(new Path(STORAGE_DIRECTORY + "/1/cleanoffsetFile")); + Assert.assertArrayEquals(address, cleanedOffset); + + } + + @Test + public void testCleanForFlushedData() throws IOException + { + byte[] b = new byte[200]; + storage.retrieve(new byte[8]); + for (int i = 0; i < 5; i++) { + storage.store(new Slice(b, 0, b.length)); + storage.store(new Slice(b, 0, b.length)); + storage.flush(); + // storage.clean(address); + } + byte[] lastWrittenAddress = null; + for (int i = 0; i < 5; i++) { + storage.store(new Slice(b, 0, b.length)); + lastWrittenAddress = storage.store(new Slice(b, 0, b.length)); + } + storage.flush(); + storage.clean(lastWrittenAddress); + byte[] cleanedOffset = storage.readData(new Path(STORAGE_DIRECTORY + "/1/cleanoffsetFile")); + Assert.assertArrayEquals(lastWrittenAddress, cleanedOffset); + + } + + @Test + public void testCleanForPartialFlushedData() throws IOException + { + byte[] b = new byte[8]; + storage.retrieve(new byte[8]); + + storage.store(new Slice(b, 0, b.length)); + byte[] bytes = "1a".getBytes(); + byte[] address = storage.store(new Slice(bytes, 0, bytes.length)); + storage.flush(); + storage.clean(address); + + byte[] lastWrittenAddress = null; + for (int i = 0; i < 5; i++) { + final byte[] bytes1 = (i + "").getBytes(); + storage.store(new Slice(bytes1, 0, bytes1.length)); + lastWrittenAddress = storage.store(new Slice(b, 0, b.length)); + } + Assert.assertNull(storage.retrieve(new byte[8])); + Assert.assertNull(storage.retrieve(lastWrittenAddress)); + storage.store(new Slice(b, 0, b.length)); + storage.flush(); + Assert.assertNull(storage.retrieve(lastWrittenAddress)); + } + + @Test + public void testRandomSequence() throws IOException + { + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + byte[] bytes = new byte[]{48, 48, 48, 51, 101, 100, 55, 56, 55, 49, 53, 99, 52, 101, 55, 50, 97, 52, 48, 49, 51, + 99, 97, 54, 102, 57, 55, 53, 57, 100, 49, 99, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, + 45, 49, 49, 45, 48, 55, 32, 48, 48, 58, 48, 48, 58, 52, 54, 1, 52, 50, 49, 50, 51, 1, 50, 1, 49, 53, 49, 49, + 52, 50, 54, 53, 1, 49, 53, 49, 49, 57, 51, 53, 49, 1, 49, 53, 49, 50, 57, 56, 50, 52, 1, 49, 53, 49, 50, 49, + 55, 48, 55, 1, 49, 48, 48, 55, 55, 51, 57, 51, 1, 49, 57, 49, 52, 55, 50, 53, 52, 54, 49, 1, 49, 1, 48, 1, 48, + 46, 48, 1, 48, 46, 48, 1, 48, 46, 48}; + storage.store(new Slice(bytes, 0, bytes.length)); + storage.flush(); + storage.clean(new byte[]{-109, 0, 0, 0, 0, 0, 0, 0}); + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + for (int i = 0; i < 2555; i++) { + byte[] bytes1 = new byte[]{48, 48, 48, 55, 56, 51, 98, 101, 50, 54, 50, 98, 52, 102, 50, 54, 56, 97, 55, 56, 102, + 48, 54, 54, 50, 49, 49, 54, 99, 98, 101, 99, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, + 45, 49, 49, 45, 48, 55, 32, 48, 48, 58, 48, 48, 58, 53, 49, 1, 49, 49, 49, 49, 54, 51, 57, 1, 50, 1, 49, 53, + 49, 48, 57, 57, 56, 51, 1, 49, 53, 49, 49, 49, 55, 48, 52, 1, 49, 53, 49, 50, 49, 51, 55, 49, 1, 49, 53, 49, + 49, 52, 56, 51, 49, 1, 49, 48, 48, 55, 49, 57, 56, 49, 1, 49, 50, 48, 50, 55, 54, 49, 54, 56, 53, 1, 49, 1, + 48, 1, 48, 46, 48, 1, 48, 46, 48, 1, 48, 46, 48}; + storage.store(new Slice(bytes1, 0, bytes1.length)); + storage.flush(); + } + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + for (int i = 0; i < 1297; i++) { + storage.retrieveNext(); + } + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + for (int i = 0; i < 1302; i++) { + storage.retrieveNext(); + } + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + for (int i = 0; i < 1317; i++) { + storage.retrieveNext(); + } + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + for (int i = 0; i < 2007; i++) { + storage.retrieveNext(); + } + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + for (int i = 0; i < 2556; i++) { + storage.retrieveNext(); + } + byte[] bytes1 = new byte[]{48, 48, 48, 48, 98, 48, 52, 54, 49, 57, 55, 51, 52, 97, 53, 101, 56, 56, 97, 55, 98, 53, + 52, 51, 98, 50, 102, 51, 49, 97, 97, 54, 1, 50, 48, 49, 51, 45, 49, 49, 45, 48, 55, 1, 50, 48, 49, 51, 45, 49, + 49, 45, 48, 55, 32, 48, 48, 58, 51, 49, 58, 52, 56, 1, 49, 48, 53, 53, 57, 52, 50, 1, 50, 1, 49, 53, 49, 49, + 54, 49, 56, 52, 1, 49, 53, 49, 49, 57, 50, 49, 49, 1, 49, 53, 49, 50, 57, 54, 54, 53, 1, 49, 53, 49, 50, 49, + 53, 52, 56, 1, 49, 48, 48, 56, 48, 51, 52, 50, 1, 55, 56, 56, 50, 54, 53, 52, 56, 1, 49, 1, 48, 1, 48, 46, 48, + 1, 48, 46, 48, 1, 48, 46, 48}; + storage.store(new Slice(bytes1, 0, bytes1.length)); + storage.flush(); + storage.retrieve(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}); + for (int i = 0; i < 2062; i++) { + storage.retrieveNext(); + + } + } + + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(HDFSStorageTest.class); +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/flume/conf/flume-conf.properties ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/flume/conf/flume-conf.properties b/flume/src/test/resources/flume/conf/flume-conf.properties index 29c81c0..b796e6d 100644 --- a/flume/src/test/resources/flume/conf/flume-conf.properties +++ b/flume/src/test/resources/flume/conf/flume-conf.properties @@ -63,7 +63,7 @@ agent1.sources.netcatSource.command = src/test/bash/subcat_periodically src/test # first sink - dt agent1.sinks.dt.id = CEVL00P -agent1.sinks.dt.type = com.datatorrent.flume.sink.DTFlumeSink +agent1.sinks.dt.type = org.apache.apex.malhar.flume.sink.DTFlumeSink agent1.sinks.dt.hostname = localhost agent1.sinks.dt.port = 8080 agent1.sinks.dt.sleepMillis = 7 @@ -72,13 +72,13 @@ agent1.sinks.dt.maximumEventsPerTransaction = 5000 agent1.sinks.dt.minimumEventsPerTransaction = 1 # Ensure that we do not lose the data handed over to us by flume. - agent1.sinks.dt.storage = com.datatorrent.flume.storage.HDFSStorage + agent1.sinks.dt.storage = org.apache.apex.malhar.flume.storage.HDFSStorage agent1.sinks.dt.storage.restore = false agent1.sinks.dt.storage.baseDir = /tmp/flume101 agent1.sinks.dt.channel = ch1 # Ensure that we are able to detect flume sinks (and failures) automatically. - agent1.sinks.dt.discovery = com.datatorrent.flume.discovery.ZKAssistedDiscovery + agent1.sinks.dt.discovery = org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery agent1.sinks.dt.discovery.connectionString = 127.0.0.1:2181 agent1.sinks.dt.discovery.basePath = /HelloDT agent1.sinks.dt.discovery.connectionTimeoutMillis = 1000 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/log4j.properties b/flume/src/test/resources/log4j.properties index ac0a107..b7516d4 100644 --- a/flume/src/test/resources/log4j.properties +++ b/flume/src/test/resources/log4j.properties @@ -1,9 +1,11 @@ # -# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121500 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121500 b/flume/src/test/resources/test_data/gentxns/2013121500 deleted file mode 100644 index 3ce5646..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121500 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121500.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121500.txt b/flume/src/test/resources/test_data/gentxns/2013121500.txt new file mode 100644 index 0000000..3ce5646 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121500.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121501 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121501 b/flume/src/test/resources/test_data/gentxns/2013121501 deleted file mode 100644 index b2e70c0..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121501 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121501.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121501.txt b/flume/src/test/resources/test_data/gentxns/2013121501.txt new file mode 100644 index 0000000..b2e70c0 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121501.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121502 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121502 b/flume/src/test/resources/test_data/gentxns/2013121502 deleted file mode 100644 index ec13862..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121502 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121502.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121502.txt b/flume/src/test/resources/test_data/gentxns/2013121502.txt new file mode 100644 index 0000000..ec13862 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121502.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121503 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121503 b/flume/src/test/resources/test_data/gentxns/2013121503 deleted file mode 100644 index 8267dd3..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121503 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121503.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121503.txt b/flume/src/test/resources/test_data/gentxns/2013121503.txt new file mode 100644 index 0000000..8267dd3 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121503.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121504 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121504 b/flume/src/test/resources/test_data/gentxns/2013121504 deleted file mode 100644 index addfe62..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121504 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121504.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121504.txt b/flume/src/test/resources/test_data/gentxns/2013121504.txt new file mode 100644 index 0000000..addfe62 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121504.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121505 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121505 b/flume/src/test/resources/test_data/gentxns/2013121505 deleted file mode 100644 index d76aa9f..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121505 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121505.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121505.txt b/flume/src/test/resources/test_data/gentxns/2013121505.txt new file mode 100644 index 0000000..d76aa9f Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121505.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121506 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121506 b/flume/src/test/resources/test_data/gentxns/2013121506 deleted file mode 100644 index 2f5bbb6..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121506 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121506.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121506.txt b/flume/src/test/resources/test_data/gentxns/2013121506.txt new file mode 100644 index 0000000..2f5bbb6 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121506.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121507 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121507 b/flume/src/test/resources/test_data/gentxns/2013121507 deleted file mode 100644 index a022dad..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121507 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121507.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121507.txt b/flume/src/test/resources/test_data/gentxns/2013121507.txt new file mode 100644 index 0000000..a022dad Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121507.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121508 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121508 b/flume/src/test/resources/test_data/gentxns/2013121508 deleted file mode 100644 index d1e7f5c..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121508 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121508.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121508.txt b/flume/src/test/resources/test_data/gentxns/2013121508.txt new file mode 100644 index 0000000..d1e7f5c Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121508.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121509 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121509 b/flume/src/test/resources/test_data/gentxns/2013121509 deleted file mode 100644 index 10d61de..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121509 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121509.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121509.txt b/flume/src/test/resources/test_data/gentxns/2013121509.txt new file mode 100644 index 0000000..10d61de Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121509.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121510 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121510 b/flume/src/test/resources/test_data/gentxns/2013121510 deleted file mode 100644 index c2f76c8..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121510 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121510.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121510.txt b/flume/src/test/resources/test_data/gentxns/2013121510.txt new file mode 100644 index 0000000..c2f76c8 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121510.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121511 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121511 b/flume/src/test/resources/test_data/gentxns/2013121511 deleted file mode 100644 index bf16cfe..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121511 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121511.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121511.txt b/flume/src/test/resources/test_data/gentxns/2013121511.txt new file mode 100644 index 0000000..bf16cfe Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121511.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121512 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121512 b/flume/src/test/resources/test_data/gentxns/2013121512 deleted file mode 100644 index fe75419..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121512 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121512.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121512.txt b/flume/src/test/resources/test_data/gentxns/2013121512.txt new file mode 100644 index 0000000..fe75419 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121512.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121513 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121513 b/flume/src/test/resources/test_data/gentxns/2013121513 deleted file mode 100644 index 3094cae..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121513 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121513.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121513.txt b/flume/src/test/resources/test_data/gentxns/2013121513.txt new file mode 100644 index 0000000..3094cae Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121513.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121514 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121514 b/flume/src/test/resources/test_data/gentxns/2013121514 deleted file mode 100644 index 6e00e4a..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121514 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121514.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121514.txt b/flume/src/test/resources/test_data/gentxns/2013121514.txt new file mode 100644 index 0000000..6e00e4a Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121514.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121515 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121515 b/flume/src/test/resources/test_data/gentxns/2013121515 deleted file mode 100644 index b860e43..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121515 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121515.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121515.txt b/flume/src/test/resources/test_data/gentxns/2013121515.txt new file mode 100644 index 0000000..b860e43 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121515.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121516 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121516 b/flume/src/test/resources/test_data/gentxns/2013121516 deleted file mode 100644 index dfb5854..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121516 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121516.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121516.txt b/flume/src/test/resources/test_data/gentxns/2013121516.txt new file mode 100644 index 0000000..dfb5854 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121516.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121517 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121517 b/flume/src/test/resources/test_data/gentxns/2013121517 deleted file mode 100644 index c8da2cc..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121517 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121517.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121517.txt b/flume/src/test/resources/test_data/gentxns/2013121517.txt new file mode 100644 index 0000000..c8da2cc Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121517.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121518 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121518 b/flume/src/test/resources/test_data/gentxns/2013121518 deleted file mode 100644 index 2cb628b..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121518 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121518.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121518.txt b/flume/src/test/resources/test_data/gentxns/2013121518.txt new file mode 100644 index 0000000..2cb628b Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121518.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121519 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121519 b/flume/src/test/resources/test_data/gentxns/2013121519 deleted file mode 100644 index 6fab9d9..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121519 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121519.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121519.txt b/flume/src/test/resources/test_data/gentxns/2013121519.txt new file mode 100644 index 0000000..6fab9d9 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121519.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121520 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121520 b/flume/src/test/resources/test_data/gentxns/2013121520 deleted file mode 100644 index ba56d49..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121520 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121520.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121520.txt b/flume/src/test/resources/test_data/gentxns/2013121520.txt new file mode 100644 index 0000000..ba56d49 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121520.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121521 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121521 b/flume/src/test/resources/test_data/gentxns/2013121521 deleted file mode 100644 index 37de926..0000000 Binary files a/flume/src/test/resources/test_data/gentxns/2013121521 and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/test/resources/test_data/gentxns/2013121521.txt ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121521.txt b/flume/src/test/resources/test_data/gentxns/2013121521.txt new file mode 100644 index 0000000..37de926 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121521.txt differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5b531ea..adc6de5 100644 --- a/pom.xml +++ b/pom.xml @@ -202,6 +202,7 @@ <module>apps</module> <module>samples</module> <module>sql</module> + <module>flume</module> </modules> </profile> </profiles>
