Repository: incubator-apex-malhar Updated Branches: refs/heads/master c5cab8bd5 -> c3f6951a5
APEXMALHAR-2031 enable WindowDataManager to save window files under user configured path Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/dec419fc Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/dec419fc Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/dec419fc Branch: refs/heads/master Commit: dec419fc2ddbd4a82598a255c25cdf774d0c0efc Parents: c5cab8b Author: Chandni Singh <[email protected]> Authored: Mon Mar 28 15:57:19 2016 -0700 Committer: Chandni Singh <[email protected]> Committed: Mon Mar 28 16:14:59 2016 -0700 ---------------------------------------------------------------------- .../datatorrent/lib/util/WindowDataManager.java | 35 ++- .../lib/util/FSWindowDataManagerTest.java | 216 +++++++++++++++++++ .../com/datatorrent/lib/util/TestUtils.java | 22 +- .../lib/util/WindowDataManagerTest.java | 200 ----------------- 4 files changed, 267 insertions(+), 206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dec419fc/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java index 7517cd4..9930d7e 100644 --- a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java +++ b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java @@ -113,6 +113,8 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera @NotNull private String recoveryPath; + private boolean isRecoveryPathRelativeToAppPath = true; + /** * largest window for which there is recovery data across all physical operator instances. */ @@ -144,7 +146,11 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera public void setup(Context.OperatorContext context) { Configuration configuration = new Configuration(); - appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath); + if (isRecoveryPathRelativeToAppPath) { + appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath); + } else { + appPath = new Path(recoveryPath); + } try { storageAgent = new FSStorageAgent(appPath.toString(), configuration); @@ -333,15 +339,42 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera } } + /** + * @return recovery path + */ public String getRecoveryPath() { return recoveryPath; } + /** + * Sets the recovery path. If {@link #isRecoveryPathRelativeToAppPath} is true then this path is handled relative + * to the application path; otherwise it is handled as an absolute path. + * + * @param recoveryPath recovery path + */ public void setRecoveryPath(String recoveryPath) { this.recoveryPath = recoveryPath; } + + /** + * @return true if recovery path is relative to app path; false otherwise. + */ + public boolean isRecoveryPathRelativeToAppPath() + { + return isRecoveryPathRelativeToAppPath; + } + + /** + * Specifies whether the recovery path is relative to application path. + * + * @param recoveryPathRelativeToAppPath true if recovery path is relative to application path; false otherwise. + */ + public void setRecoveryPathRelativeToAppPath(boolean recoveryPathRelativeToAppPath) + { + isRecoveryPathRelativeToAppPath = recoveryPathRelativeToAppPath; + } } /** http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dec419fc/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java b/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java new file mode 100644 index 0000000..26996e7 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.util; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.TreeSet; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.helper.OperatorContextTestHelper; + +/** + * Tests for {@link WindowDataManager} + */ +public class FSWindowDataManagerTest +{ + private static class TestMeta extends TestWatcher + { + + String applicationPath; + WindowDataManager.FSWindowDataManager storageManager; + Context.OperatorContext context; + + @Override + protected void starting(Description description) + { + TestUtils.deleteTargetTestClassFolder(description); + super.starting(description); + storageManager = new WindowDataManager.FSWindowDataManager(); + applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, applicationPath); + context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes); + } + + @Override + protected void finished(Description description) + { + TestUtils.deleteTargetTestClassFolder(description); + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testLargestRecoveryWindow() + { + testMeta.storageManager.setup(testMeta.context); + Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, testMeta.storageManager.getLargestRecoveryWindow()); + testMeta.storageManager.teardown(); + } + + @Test + public void testSave() throws IOException + { + testMeta.storageManager.setup(testMeta.context); + Map<Integer, String> data = Maps.newHashMap(); + data.put(1, "one"); + data.put(2, "two"); + data.put(3, "three"); + testMeta.storageManager.save(data, 1, 1); + testMeta.storageManager.setup(testMeta.context); + @SuppressWarnings("unchecked") + Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1); + Assert.assertEquals("dataOf1", data, decoded); + testMeta.storageManager.teardown(); + } + + @Test + public void testLoad() throws IOException + { + testMeta.storageManager.setup(testMeta.context); + Map<Integer, String> dataOf1 = Maps.newHashMap(); + dataOf1.put(1, "one"); + dataOf1.put(2, "two"); + dataOf1.put(3, "three"); + + Map<Integer, String> dataOf2 = Maps.newHashMap(); + dataOf2.put(4, "four"); + dataOf2.put(5, "five"); + dataOf2.put(6, "six"); + + testMeta.storageManager.save(dataOf1, 1, 1); + testMeta.storageManager.save(dataOf2, 2, 1); + testMeta.storageManager.setup(testMeta.context); + Map<Integer, Object> decodedStates = testMeta.storageManager.load(1); + Assert.assertEquals("no of states", 2, decodedStates.size()); + for (Integer operatorId : decodedStates.keySet()) { + if (operatorId == 1) { + Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1)); + } else { + Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2)); + } + } + testMeta.storageManager.teardown(); + } + + @Test + public void testRecovery() throws IOException + { + testMeta.storageManager.setup(testMeta.context); + Map<Integer, String> dataOf1 = Maps.newHashMap(); + dataOf1.put(1, "one"); + dataOf1.put(2, "two"); + dataOf1.put(3, "three"); + + Map<Integer, String> dataOf2 = Maps.newHashMap(); + dataOf2.put(4, "four"); + dataOf2.put(5, "five"); + dataOf2.put(6, "six"); + + testMeta.storageManager.save(dataOf1, 1, 1); + testMeta.storageManager.save(dataOf2, 2, 2); + + testMeta.storageManager.setup(testMeta.context); + Assert.assertEquals("largest recovery window", 2, testMeta.storageManager.getLargestRecoveryWindow()); + testMeta.storageManager.teardown(); + } + + @Test + public void testDelete() throws IOException + { + testMeta.storageManager.setup(testMeta.context); + Map<Integer, String> dataOf1 = Maps.newHashMap(); + dataOf1.put(1, "one"); + dataOf1.put(2, "two"); + dataOf1.put(3, "three"); + + Map<Integer, String> dataOf2 = Maps.newHashMap(); + dataOf2.put(4, "four"); + dataOf2.put(5, "five"); + dataOf2.put(6, "six"); + + Map<Integer, String> dataOf3 = Maps.newHashMap(); + dataOf2.put(7, "seven"); + dataOf2.put(8, "eight"); + dataOf2.put(9, "nine"); + + for (int i = 1; i <= 9; ++i) { + testMeta.storageManager.save(dataOf1, 1, i); + } + + testMeta.storageManager.save(dataOf2, 2, 1); + testMeta.storageManager.save(dataOf3, 3, 1); + + testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager), + Sets.newHashSet(2, 3)); + testMeta.storageManager.setup(testMeta.context); + testMeta.storageManager.deleteUpTo(1, 6); + + Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath()); + FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration()); + FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1))); + Assert.assertEquals("number of windows for 1", 3, fileStatuses.length); + TreeSet<String> windows = Sets.newTreeSet(); + for (FileStatus fileStatus : fileStatuses) { + windows.add(fileStatus.getPath().getName()); + } + Assert.assertEquals("window list for 1", Sets.newTreeSet(Arrays.asList("7", "8", "9")), windows); + Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2)))); + Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3)))); + testMeta.storageManager.teardown(); + } + + @Test + public void testAbsoluteRecoveryPath() throws IOException + { + testMeta.storageManager.setRecoveryPathRelativeToAppPath(false); + long time = System.currentTimeMillis(); + testMeta.storageManager.setRecoveryPath("target/" + time); + testSave(); + File recoveryDir = new File("target/" + time); + Assert.assertTrue("recover path exist", recoveryDir.isDirectory()); + FileUtils.deleteDirectory(recoveryDir); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dec419fc/library/src/test/java/com/datatorrent/lib/util/TestUtils.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java index 37d55e7..37aa7e7 100644 --- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java +++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java @@ -18,18 +18,21 @@ */ package com.datatorrent.lib.util; -import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.util.List; import org.junit.rules.TestWatcher; +import org.junit.runner.Description; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; +import org.apache.commons.io.FileUtils; -import com.datatorrent.api.*; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.OutputPort; +import com.datatorrent.api.Sink; +import com.datatorrent.api.Stats; +import com.datatorrent.api.StatsListener; public class TestUtils { @@ -51,6 +54,15 @@ public class TestUtils } } + public static void deleteTargetTestClassFolder(Description description) + { + try { + FileUtils.deleteDirectory(new File("target/" + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @SuppressWarnings({ "unchecked", "rawtypes" }) public static <S extends Sink, T> S setSink(OutputPort<T> port, S sink) { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dec419fc/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java deleted file mode 100644 index 845b992..0000000 --- a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.util; - -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.Map; -import java.util.TreeSet; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import com.datatorrent.api.Attribute; -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.annotation.Stateless; -import com.datatorrent.lib.helper.OperatorContextTestHelper; - -/** - * Tests for {@link WindowDataManager} - */ -public class WindowDataManagerTest -{ - private static class TestMeta extends TestWatcher - { - - String applicationPath; - WindowDataManager.FSWindowDataManager storageManager; - Context.OperatorContext context; - - @Override - protected void starting(Description description) - { - super.starting(description); - storageManager = new WindowDataManager.FSWindowDataManager(); - applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); - - Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); - attributes.put(DAG.APPLICATION_PATH, applicationPath); - context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes); - - storageManager.setup(context); - } - - @Override - protected void finished(Description description) - { - storageManager.teardown(); - try { - FileUtils.deleteDirectory(new File("target/" + description.getClassName())); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - @Rule - public TestMeta testMeta = new TestMeta(); - - @Test - public void testLargestRecoveryWindow() - { - Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, testMeta.storageManager.getLargestRecoveryWindow()); - } - - @Test - public void testSave() throws IOException - { - Map<Integer, String> data = Maps.newHashMap(); - data.put(1, "one"); - data.put(2, "two"); - data.put(3, "three"); - testMeta.storageManager.save(data, 1, 1); - testMeta.storageManager.setup(testMeta.context); - @SuppressWarnings("unchecked") - Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1); - Assert.assertEquals("dataOf1", data, decoded); - } - - @Test - public void testLoad() throws IOException - { - Map<Integer, String> dataOf1 = Maps.newHashMap(); - dataOf1.put(1, "one"); - dataOf1.put(2, "two"); - dataOf1.put(3, "three"); - - Map<Integer, String> dataOf2 = Maps.newHashMap(); - dataOf2.put(4, "four"); - dataOf2.put(5, "five"); - dataOf2.put(6, "six"); - - testMeta.storageManager.save(dataOf1, 1, 1); - testMeta.storageManager.save(dataOf2, 2, 1); - testMeta.storageManager.setup(testMeta.context); - Map<Integer, Object> decodedStates = testMeta.storageManager.load(1); - Assert.assertEquals("no of states", 2, decodedStates.size()); - for (Integer operatorId : decodedStates.keySet()) { - if (operatorId == 1) { - Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1)); - } else { - Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2)); - } - } - } - - @Test - public void testRecovery() throws IOException - { - Map<Integer, String> dataOf1 = Maps.newHashMap(); - dataOf1.put(1, "one"); - dataOf1.put(2, "two"); - dataOf1.put(3, "three"); - - Map<Integer, String> dataOf2 = Maps.newHashMap(); - dataOf2.put(4, "four"); - dataOf2.put(5, "five"); - dataOf2.put(6, "six"); - - testMeta.storageManager.save(dataOf1, 1, 1); - testMeta.storageManager.save(dataOf2, 2, 2); - - testMeta.storageManager.setup(testMeta.context); - Assert.assertEquals("largest recovery window", 2, testMeta.storageManager.getLargestRecoveryWindow()); - } - - @Test - public void testDelete() throws IOException - { - Map<Integer, String> dataOf1 = Maps.newHashMap(); - dataOf1.put(1, "one"); - dataOf1.put(2, "two"); - dataOf1.put(3, "three"); - - Map<Integer, String> dataOf2 = Maps.newHashMap(); - dataOf2.put(4, "four"); - dataOf2.put(5, "five"); - dataOf2.put(6, "six"); - - Map<Integer, String> dataOf3 = Maps.newHashMap(); - dataOf2.put(7, "seven"); - dataOf2.put(8, "eight"); - dataOf2.put(9, "nine"); - - for (int i = 1; i <= 9; ++i) { - testMeta.storageManager.save(dataOf1, 1, i); - } - - testMeta.storageManager.save(dataOf2, 2, 1); - testMeta.storageManager.save(dataOf3, 3, 1); - - testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager), - Sets.newHashSet(2, 3)); - testMeta.storageManager.setup(testMeta.context); - testMeta.storageManager.deleteUpTo(1, 6); - - Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath()); - FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration()); - FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1))); - Assert.assertEquals("number of windows for 1", 3, fileStatuses.length); - TreeSet<String> windows = Sets.newTreeSet(); - for (FileStatus fileStatus : fileStatuses) { - windows.add(fileStatus.getPath().getName()); - } - Assert.assertEquals("window list for 1", Sets.newLinkedHashSet(Arrays.asList("7", "8", "9")), windows); - Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2)))); - Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3)))); - } - -}
