aplex commented on a change in pull request #3054: URL: https://github.com/apache/incubator-gobblin/pull/3054#discussion_r449186127
########## File path: gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java ########## @@ -78,49 +82,101 @@ public void setup() throws Exception { public void testWrite() throws Exception { String streamString1 = "testContents1"; String streamString2 = "testContents2"; + String userDefStagingDir = System.getProperty("user.dir") + "/user_staging_dir"; + + testRootDir = new Path(Paths.get("").toAbsolutePath().toString(), + getClass().getSimpleName()); + localFs = FileSystem.getLocal(new Configuration()); + + Path ds1 = createDatasetPath("db1"); + String dataSetDefinedStagingDir = testRootDir + "/db1/staging"; + FileStatus status = fs.getFileStatus(testTempPath); OwnerAndPermission ownerAndPermission = new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission); + CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source"))); + WorkUnitState state = TestUtils.createTestWorkUnitState(); + state.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,false); state.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false); state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, "staging").toString()); state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, "output").toString()); state.setProp(ConfigurationKeys.WRITER_FILE_PATH, RandomStringUtils.randomAlphabetic(5)); CopySource.serializeCopyEntity(state, cf); CopySource.serializeCopyableDataset(state, metadata); + FileAwareInputStreamDataWriter dataWriter = new FileAwareInputStreamDataWriter(state, 1, 0); + FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder().file(cf) .inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString1))).build(); + Assert.assertNotEquals(dataWriter.stagingDir,userDefStagingDir); + Assert.assertNotEquals(dataWriter.stagingDir,dataSetDefinedStagingDir); + dataWriter.write(fileAwareInputStream); dataWriter.commit(); Path writtenFilePath = new Path(new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), cf.getDatasetAndPartition(metadata).identifier()), cf.getDestination()); + Assert.assertEquals(IOUtils.toString(new FileInputStream(writtenFilePath.toString())), streamString1); //testing user defined staging directory WorkUnitState state2 = TestUtils.createTestWorkUnitState(); + state2.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,false); state2.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,true); state2.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, "staging").toString()); state2.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, "output2").toString()); state2.setProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR,userDefStagingDir); state2.setProp(ConfigurationKeys.WRITER_FILE_PATH, RandomStringUtils.randomAlphabetic(5)); CopySource.serializeCopyEntity(state2, cf); CopySource.serializeCopyableDataset(state2, metadata); + dataWriter = new FileAwareInputStreamDataWriter(state2, 1, 0); + fileAwareInputStream = FileAwareInputStream.builder().file(cf) .inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString2))).build(); + Assert.assertEquals(dataWriter.stagingDir.toUri().toString(),userDefStagingDir); + Assert.assertNotEquals(dataWriter.stagingDir,dataSetDefinedStagingDir); + dataWriter.write(fileAwareInputStream); dataWriter.commit(); writtenFilePath = new Path(new Path(state2.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), cf.getDatasetAndPartition(metadata).identifier()), cf.getDestination()); + Assert.assertEquals(IOUtils.toString(new FileInputStream(writtenFilePath.toString())), streamString2); + + //testing dataset defined staging directory Review comment: Can we split this large test into several smaller unit tests that are focusing on specific use case? Then it will be easier to understand what are we testing and expecting. The common logic can be moved to a separate methods that we call explicitly. TestNG can also call them before each test, if we put @BeforeTest/@BeforeClass annotation on them. ########## File path: gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java ########## @@ -78,49 +82,101 @@ public void setup() throws Exception { public void testWrite() throws Exception { String streamString1 = "testContents1"; String streamString2 = "testContents2"; + String userDefStagingDir = System.getProperty("user.dir") + "/user_staging_dir"; + + testRootDir = new Path(Paths.get("").toAbsolutePath().toString(), + getClass().getSimpleName()); + localFs = FileSystem.getLocal(new Configuration()); + + Path ds1 = createDatasetPath("db1"); + String dataSetDefinedStagingDir = testRootDir + "/db1/staging"; + FileStatus status = fs.getFileStatus(testTempPath); OwnerAndPermission ownerAndPermission = new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission); + CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source"))); + WorkUnitState state = TestUtils.createTestWorkUnitState(); + state.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,false); state.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false); state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, "staging").toString()); state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, "output").toString()); state.setProp(ConfigurationKeys.WRITER_FILE_PATH, RandomStringUtils.randomAlphabetic(5)); CopySource.serializeCopyEntity(state, cf); CopySource.serializeCopyableDataset(state, metadata); + FileAwareInputStreamDataWriter dataWriter = new FileAwareInputStreamDataWriter(state, 1, 0); + FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder().file(cf) .inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString1))).build(); + Assert.assertNotEquals(dataWriter.stagingDir,userDefStagingDir); + Assert.assertNotEquals(dataWriter.stagingDir,dataSetDefinedStagingDir); + dataWriter.write(fileAwareInputStream); dataWriter.commit(); Path writtenFilePath = new Path(new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), cf.getDatasetAndPartition(metadata).identifier()), cf.getDestination()); + Assert.assertEquals(IOUtils.toString(new FileInputStream(writtenFilePath.toString())), streamString1); //testing user defined staging directory WorkUnitState state2 = TestUtils.createTestWorkUnitState(); + state2.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,false); state2.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,true); state2.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, "staging").toString()); state2.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, "output2").toString()); state2.setProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR,userDefStagingDir); state2.setProp(ConfigurationKeys.WRITER_FILE_PATH, RandomStringUtils.randomAlphabetic(5)); CopySource.serializeCopyEntity(state2, cf); CopySource.serializeCopyableDataset(state2, metadata); + dataWriter = new FileAwareInputStreamDataWriter(state2, 1, 0); + fileAwareInputStream = FileAwareInputStream.builder().file(cf) .inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString2))).build(); + Assert.assertEquals(dataWriter.stagingDir.toUri().toString(),userDefStagingDir); + Assert.assertNotEquals(dataWriter.stagingDir,dataSetDefinedStagingDir); + dataWriter.write(fileAwareInputStream); dataWriter.commit(); writtenFilePath = new Path(new Path(state2.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), cf.getDatasetAndPartition(metadata).identifier()), cf.getDestination()); + Assert.assertEquals(IOUtils.toString(new FileInputStream(writtenFilePath.toString())), streamString2); + + //testing dataset defined staging directory + CopyableDatasetMetadata metadata2 = new CopyableDatasetMetadata(new TestCopyableDataset(new Path(testRootDir + "/db1"))); + WorkUnitState state3 = TestUtils.createTestWorkUnitState(); + state3.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,true); Review comment: There is a missing space here. You can run reformat code command in IntelliJ to fix all the code style issues. If you want to fix only your code, you can first select it in the editor, and then run the command. ########## File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java ########## @@ -88,6 +89,7 @@ public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false; public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = "gobblin.copy.task.overwrite.on.commit"; public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = false; + public static final String STAGING_DIR_SUFFIX = "/staging"; Review comment: There are a couple of things we need to keep in mind regarding this folder: 1. Since we are going to put those in every dataset folder, we need to have a way to tell apart folders that are part of the dataset vs our temporary storage places. 2. If our folder name starts with the dot ".", it will be hidden by default in UI and CLI tools, so users wouldn't be distracted by those folders. 3. We'll need to have a way to find all such folders and delete them from time to time. If job gets interrupted and does not clean up after itself, some automation would need to go and delete old temporary files. Otherwise those abandoned temp files will consume all of the storage space. I think we can go with ".tmp" to ensure that this folder is hidden and is clearly marked as temporary. ########## File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java ########## @@ -139,17 +142,20 @@ public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId URI uri = URI.create(uriStr); this.fs = FileSystem.get(uri, conf); this.fileContext = FileContext.getFileContext(uri, conf); + this.copyableDatasetMetadata = + CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET)); if (state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) { this.stagingDir = new Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR)); + } else if (state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false)) { + this.stagingDir = new Path(this.copyableDatasetMetadata.getDatasetURN() + STAGING_DIR_SUFFIX); Review comment: You can pass multiple arguments to Path to concatenate them. It's better to avoid adding path strings to each other, because you can end up with multiple of no slashes in the path. For example, depending on user input, you can end up with "/first/second", "/firstsecond" or "/first//second". ########## File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java ########## @@ -139,17 +142,20 @@ public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId URI uri = URI.create(uriStr); this.fs = FileSystem.get(uri, conf); this.fileContext = FileContext.getFileContext(uri, conf); + this.copyableDatasetMetadata = + CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET)); if (state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) { this.stagingDir = new Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR)); + } else if (state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false)) { + this.stagingDir = new Path(this.copyableDatasetMetadata.getDatasetURN() + STAGING_DIR_SUFFIX); Review comment: URN is a standard with a special format like "urn:linkedin:data". Looks like we'll have a path and not a URN here. I wonder if there are other methods that will return data set location, or whenever we can rename this method to something like getDatasetURI() or getDatasetURL(), depending on what we can get here. See also the discussion on the differences between URI/URL/URN - https://stackoverflow.com/a/28865728 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org