ATLAS-2843: AtlasClient updates for exportData and importData.
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/20aa9be0 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/20aa9be0 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/20aa9be0 Branch: refs/heads/branch-1.0 Commit: 20aa9be0af63e2c0d5022202fb9014ed861de86d Parents: 0cd5bc5 Author: Ashutosh Mestry <ames...@hortonworks.com> Authored: Mon Aug 27 10:32:19 2018 -0700 Committer: Ashutosh Mestry <ames...@hortonworks.com> Committed: Thu Nov 1 15:42:54 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/atlas/AtlasBaseClient.java | 28 +-- .../repository/impexp/ImportTransforms.java | 170 +++++++++++-------- .../repository/impexp/ImportTransformsTest.java | 7 + .../web/resources/AdminExportImportTestIT.java | 19 +-- 4 files changed, 134 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/20aa9be0/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java ---------------------------------------------------------------------- diff --git a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java index c247902..ca772a7 100644 --- a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java +++ b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java @@ -59,10 +59,10 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; -import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.ConnectException; import java.net.URI; @@ -385,7 +385,7 @@ public abstract class AtlasBaseClient { } try { if(api.getProduces().equals(MediaType.APPLICATION_OCTET_STREAM)) { - return (T) IOUtils.toByteArray(clientResponse.getEntityInputStream()); + return (T) clientResponse.getEntityInputStream(); } else if (responseType.getRawClass().equals(ObjectNode.class)) { String stringEntity = clientResponse.getEntity(String.class); try { @@ -404,8 +404,6 @@ public abstract class AtlasBaseClient { } } catch (ClientHandlerException e) { throw new AtlasServiceException(api, e); - } catch (IOException e) { - throw new AtlasServiceException(api, e); } } else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) { break; @@ -467,9 +465,9 @@ public abstract class AtlasBaseClient { return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasBaseClient.DEFAULT_NUM_RETRIES); } - public byte[] exportData(AtlasExportRequest request) throws AtlasServiceException { + public InputStream exportData(AtlasExportRequest request) throws AtlasServiceException { try { - return (byte[]) callAPI(EXPORT, Object.class, request); + return (InputStream) callAPI(EXPORT, Object.class, request); } catch (Exception e) { LOG.error("error writing to file", e); throw new AtlasServiceException(e); @@ -479,14 +477,22 @@ public abstract class AtlasBaseClient { public void exportData(AtlasExportRequest request, String absolutePath) throws AtlasServiceException { OutputStream fileOutputStream = null; try { - byte[] fileBytes = exportData(request); + InputStream inputStream = exportData(request); fileOutputStream = new FileOutputStream(new File(absolutePath)); - IOUtils.write(fileBytes, fileOutputStream); + byte[] buffer = new byte[8 * 1024]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + fileOutputStream.write(buffer, 0, bytesRead); + } + + IOUtils.closeQuietly(inputStream); + IOUtils.closeQuietly(fileOutputStream); + } catch (Exception e) { LOG.error("error writing to file", e); throw new AtlasServiceException(e); } finally { - if(fileOutputStream != null) { + if (fileOutputStream != null) { try { fileOutputStream.close(); } catch (IOException e) { @@ -502,9 +508,9 @@ public abstract class AtlasBaseClient { new FileDataBodyPart(IMPORT_DATA_PARAMETER, new File(absoluteFilePath))); } - public AtlasImportResult importData(AtlasImportRequest request, byte[] fileData) throws AtlasServiceException { + public AtlasImportResult importData(AtlasImportRequest request, InputStream stream) throws AtlasServiceException { return performImportData(getImportRequestBodyPart(request), - new StreamDataBodyPart(IMPORT_DATA_PARAMETER, new ByteArrayInputStream(fileData))); + new StreamDataBodyPart(IMPORT_DATA_PARAMETER, stream)); } private AtlasImportResult performImportData(BodyPart requestPart, BodyPart filePart) throws AtlasServiceException { http://git-wip-us.apache.org/repos/asf/atlas/blob/20aa9be0/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java index 2f27448..72b684b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransforms.java @@ -35,17 +35,16 @@ import java.util.Set; public class ImportTransforms { private static final Logger LOG = LoggerFactory.getLogger(ImportTransforms.class); - private Map<String, Map<String, List<ImportTransformer>>> transforms; + private static final String ALL_ATTRIBUTES = "*"; + private Map<String, Map<String, List<ImportTransformer>>> transforms; public static ImportTransforms fromJson(String jsonString) { - ImportTransforms ret = null; - - if (StringUtils.isNotBlank(jsonString)) { - ret = new ImportTransforms(jsonString); + if (StringUtils.isEmpty(jsonString)) { + return null; } - return ret; + return new ImportTransforms(jsonString); } public Map<String, Map<String, List<ImportTransformer>>> getTransforms() { @@ -72,13 +71,15 @@ public class ImportTransforms { } public AtlasEntity.AtlasEntityWithExtInfo apply(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException { - if (entityWithExtInfo != null) { - apply(entityWithExtInfo.getEntity()); + if (entityWithExtInfo == null) { + return entityWithExtInfo; + } - if(MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) { - for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { - apply(e); - } + apply(entityWithExtInfo.getEntity()); + + if(MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) { + for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { + apply(e); } } @@ -86,30 +87,46 @@ public class ImportTransforms { } public AtlasEntity apply(AtlasEntity entity) throws AtlasBaseException { - if(entity != null) { - Map<String, List<ImportTransformer>> entityTransforms = getTransforms(entity.getTypeName()); + if (entity == null) { + return entity; + } + + Map<String, List<ImportTransformer>> entityTransforms = getTransforms(entity.getTypeName()); + if (MapUtils.isEmpty(entityTransforms)) { + return entity; + } + + applyEntitySpecific(entity, entityTransforms); - if (MapUtils.isNotEmpty(entityTransforms)) { - for (Map.Entry<String, List<ImportTransformer>> entry : entityTransforms.entrySet()) { - String attributeName = entry.getKey(); - List<ImportTransformer> attrTransforms = entry.getValue(); + applyAttributeSpecific(entity, entityTransforms); - if (!entity.hasAttribute(attributeName)) { - continue; - } + return entity; + } - Object transformedValue = entity.getAttribute(attributeName); + private void applyAttributeSpecific(AtlasEntity entity, Map<String, List<ImportTransformer>> entityTransforms) throws AtlasBaseException { + for (Map.Entry<String, List<ImportTransformer>> entry : entityTransforms.entrySet()) { + String attributeName = entry.getKey(); + List<ImportTransformer> attrTransforms = entry.getValue(); - for (ImportTransformer attrTransform : attrTransforms) { - transformedValue = attrTransform.apply(transformedValue); - } + if (!entity.hasAttribute(attributeName)) { + continue; + } - entity.setAttribute(attributeName, transformedValue); - } + Object attributeValue = entity.getAttribute(attributeName); + for (ImportTransformer attrTransform : attrTransforms) { + attributeValue = attrTransform.apply(attributeValue); } + + entity.setAttribute(attributeName, attributeValue); } + } - return entity; + private void applyEntitySpecific(AtlasEntity entity, Map<String, List<ImportTransformer>> entityTransforms) throws AtlasBaseException { + if(entityTransforms.containsKey(ALL_ATTRIBUTES)) { + for (ImportTransformer attrTransform : entityTransforms.get(ALL_ATTRIBUTES)) { + attrTransform.apply(entity); + } + } } private ImportTransforms() { @@ -119,38 +136,58 @@ public class ImportTransforms { private ImportTransforms(String jsonString) { this(); - if(jsonString != null) { - Map typeTransforms = AtlasType.fromJson(jsonString, Map.class); - - if (MapUtils.isNotEmpty(typeTransforms)) { - for (Object key : typeTransforms.keySet()) { - Object value = typeTransforms.get(key); - String entityType = (String) key; - Map<String, Object> attributeTransforms = (Map<String, Object>)value; - - if (MapUtils.isNotEmpty(attributeTransforms)) { - for (Map.Entry<String, Object> e : attributeTransforms.entrySet()) { - String attributeName = e.getKey(); - List<String> transforms = (List<String>)e.getValue(); - - if (CollectionUtils.isNotEmpty(transforms)) { - for (String transform : transforms) { - ImportTransformer transformers = null; - - try { - transformers = ImportTransformer.getTransformer(transform); - } catch (AtlasBaseException ex) { - LOG.error("Error converting string to ImportTransformer: {}", transform, ex); - } - - if (transformers != null) { - add(entityType, attributeName, transformers); - } - } - } - } - } + if (StringUtils.isEmpty(jsonString)) { + return; + } + + Map typeTransforms = AtlasType.fromJson(jsonString, Map.class); + if (MapUtils.isEmpty(typeTransforms)) { + return; + } + + addOuterMap(typeTransforms); + } + + private void addOuterMap(Map typeTransforms) { + for (Object key : typeTransforms.keySet()) { + Object value = typeTransforms.get(key); + String entityType = (String) key; + Map<String, Object> attributeTransforms = (Map<String, Object>)value; + + if (MapUtils.isEmpty(attributeTransforms)) { + continue; + } + + addInnerMap(entityType, attributeTransforms); + } + } + + private void addInnerMap(String entityType, Map<String, Object> attributeTransforms) { + for (Map.Entry<String, Object> e : attributeTransforms.entrySet()) { + String attributeName = e.getKey(); + List<String> transforms = (List<String>)e.getValue(); + + if (CollectionUtils.isEmpty(transforms)) { + continue; + } + + addTransforms(entityType, attributeName, transforms); + } + } + + private void addTransforms(String entityType, String attributeName, List<String> transforms) { + for (String transform : transforms) { + ImportTransformer transformers = null; + + try { + transformers = ImportTransformer.getTransformer(transform); + if (transformers == null) { + continue; } + + add(entityType, attributeName, transformers); + } catch (AtlasBaseException ex) { + LOG.error("Error converting string to ImportTransformer: {}", transform, ex); } } } @@ -158,21 +195,16 @@ public class ImportTransforms { private void add(String typeName, String attributeName, ImportTransformer transformer) { Map<String, List<ImportTransformer>> attrMap; - if(transforms.containsKey(typeName)) { - attrMap = transforms.get(typeName); - } else { + if(!transforms.containsKey(typeName)) { attrMap = new HashMap<>(); transforms.put(typeName, attrMap); } - List<ImportTransformer> list; - if(attrMap.containsKey(attributeName)) { - list = attrMap.get(attributeName); - } else { - list = new ArrayList<>(); - attrMap.put(attributeName, list); + attrMap = transforms.get(typeName); + if(!attrMap.containsKey(attributeName)) { + attrMap.put(attributeName, new ArrayList<ImportTransformer>()); } - list.add(transformer); + attrMap.get(attributeName).add(transformer); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/20aa9be0/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java index 1e8211a..cd623d0 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java @@ -128,6 +128,7 @@ public class ImportTransformsTest { t.apply(entity); + assertEquals(entity.getClassifications().size(), 0); assertNotNull(t); assertEquals(entity.getAttribute(ATTR_NAME_QUALIFIED_NAME), expected_qualifiedName); } @@ -145,6 +146,7 @@ public class ImportTransformsTest { assertNotNull(t); assertEquals(entity.getAttribute(ATTR_NAME_QUALIFIED_NAME), expected_qualifiedName); + assertEquals(entity.getAttribute(HIVE_TABLE_ATTR_SYNC_INFO), new ArrayList<String>() {{ add(expected_syncInfo); }}); } @@ -161,6 +163,8 @@ public class ImportTransformsTest { t.apply(entity); assertNotNull(t); + assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_FROM)); + assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_TO)); } @Test @@ -173,6 +177,8 @@ public class ImportTransformsTest { t.apply(entity); assertNotNull(t); + assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_FROM)); + assertNull(entity.getAttribute(HIVE_TABLE_ATTR_REPLICATED_TO)); } @Test @@ -185,6 +191,7 @@ public class ImportTransformsTest { t.apply(entity); assertNotNull(t); + assertEquals(entity.getStatus(), AtlasEntity.Status.DELETED); } @Test http://git-wip-us.apache.org/repos/asf/atlas/blob/20aa9be0/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java index cc5d36b..d156054 100644 --- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java @@ -32,15 +32,13 @@ import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.Test; -import java.io.ByteArrayInputStream; +import java.io.FileInputStream; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; +import java.io.InputStream; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; public class AdminExportImportTestIT extends BaseResourceIT { @@ -69,10 +67,10 @@ public class AdminExportImportTestIT extends BaseResourceIT { final int EXPECTED_CREATION_ORDER_SIZE = 10; AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(".", EXPORT_REQUEST_FILE, AtlasExportRequest.class); - byte[] exportedBytes = atlasClientV2.exportData(request); - assertNotNull(exportedBytes); + InputStream exportedStream = atlasClientV2.exportData(request); + assertNotNull(exportedStream); - ZipSource zs = new ZipSource(new ByteArrayInputStream(exportedBytes)); + ZipSource zs = new ZipSource(exportedStream); assertNotNull(zs.getExportResult()); assertTrue(zs.getCreationOrder().size() > EXPECTED_CREATION_ORDER_SIZE); } @@ -87,14 +85,15 @@ public class AdminExportImportTestIT extends BaseResourceIT { private void performImport(String fileToImport, AtlasImportRequest request) throws AtlasServiceException { - byte[] fileBytes = new byte[0]; + FileInputStream fileInputStream = null; + try { - fileBytes = Files.readAllBytes(Paths.get(TestResourceFileUtils.getTestFilePath(fileToImport))); + fileInputStream = new FileInputStream(TestResourceFileUtils.getTestFilePath(fileToImport)); } catch (IOException e) { assertFalse(true, "Exception: " + e.getMessage()); } - AtlasImportResult result = atlasClientV2.importData(request, fileBytes); + AtlasImportResult result = atlasClientV2.importData(request, fileInputStream); assertNotNull(result); assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS); assertNotNull(result.getMetrics());