This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit e8661ecbc49e750b3e962aa999f11b2a94d8fe27
Author: Ashutosh Mestry <[email protected]>
AuthorDate: Mon Mar 2 08:14:04 2020 -0800

    ATLAS-3641: Import Service: Support zipDirect format of import.
---
 .../atlas/model/impexp/AtlasImportRequest.java     |  20 +-
 .../java/org/apache/atlas/utils/AtlasJson.java     |   4 +
 .../atlas/repository/impexp/ImportService.java     |  17 +-
 .../repository/impexp/ZipExportFileNames.java      |   4 +
 .../atlas/repository/impexp/ZipSourceDirect.java   | 333 +++++++++++++++++++++
 .../migration/ZipFileMigrationImporter.java        |   8 +-
 .../atlas/repository/impexp/ImportServiceTest.java |  15 +
 .../atlas/repository/impexp/ZipDirectTest.java     |  83 +++++
 8 files changed, 474 insertions(+), 10 deletions(-)

diff --git 
a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java 
b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 0b3ede9..3362bf1 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -44,10 +44,13 @@ public class AtlasImportRequest implements Serializable {
     public  static final String TRANSFORMS_KEY             = "transforms";
     public  static final String TRANSFORMERS_KEY           = "transformers";
     public  static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
+    public  static final String OPTION_KEY_FORMAT          = "format";
+    public  static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect";
     private static final String START_POSITION_KEY         = "startPosition";
     private static final String START_GUID_KEY             = "startGuid";
     private static final String FILE_NAME_KEY              = "fileName";
     private static final String UPDATE_TYPE_DEFINITION_KEY = 
"updateTypeDefinition";
+    private static final String OPTION_KEY_STREAM_SIZE     = "size";
 
     private Map<String, String> options;
 
@@ -108,7 +111,7 @@ public class AtlasImportRequest implements Serializable {
             return null;
         }
 
-        return (String) this.options.get(key);
+        return this.options.get(key);
     }
 
     @JsonIgnore
@@ -127,4 +130,17 @@ public class AtlasImportRequest implements Serializable {
             options = new HashMap<>();
         }
         options.put(key, value);
-    }}
+    }
+
+    public void setSizeOption(int size) {
+        setOption(OPTION_KEY_STREAM_SIZE, Integer.toString(size));
+    }
+
+    public int getSizeOption() {
+        if (!this.options.containsKey(OPTION_KEY_STREAM_SIZE)) {
+            return 1;
+        }
+
+        return Integer.valueOf(this.options.get(OPTION_KEY_STREAM_SIZE));
+    }
+}
diff --git a/intg/src/main/java/org/apache/atlas/utils/AtlasJson.java 
b/intg/src/main/java/org/apache/atlas/utils/AtlasJson.java
index 1c13860..abeddf6 100644
--- a/intg/src/main/java/org/apache/atlas/utils/AtlasJson.java
+++ b/intg/src/main/java/org/apache/atlas/utils/AtlasJson.java
@@ -251,6 +251,10 @@ public class AtlasJson {
         return ret;
     }
 
+    public static ObjectCodec getMapper() {
+        return mapper;
+    }
+
     static class DateSerializer extends JsonSerializer<Date> {
         @Override
         public void serialize(Date value, JsonGenerator jgen, 
SerializerProvider provider) throws IOException {
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 27001e3..1964ade 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -92,7 +92,7 @@ public class ImportService {
             request = new AtlasImportRequest();
         }
 
-        EntityImportStream source = createZipSource(inputStream, 
AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
+        EntityImportStream source = createZipSource(request, inputStream, 
AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
         return run(source, request, userName, hostName, requestingIP);
     }
 
@@ -248,8 +248,13 @@ public class ImportService {
         return (int) (endTime - startTime);
     }
 
-    private EntityImportStream createZipSource(InputStream inputStream, String 
configuredTemporaryDirectory) throws AtlasBaseException {
+    private EntityImportStream createZipSource(AtlasImportRequest request, 
InputStream inputStream, String configuredTemporaryDirectory) throws 
AtlasBaseException {
         try {
+            if 
(request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
+                    
request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT)
 ) {
+                return getZipDirectEntityImportStream(request, inputStream);
+            }
+
             if (StringUtils.isEmpty(configuredTemporaryDirectory)) {
                 return new ZipSource(inputStream);
             }
@@ -260,9 +265,15 @@ public class ImportService {
         }
     }
 
+    private EntityImportStream 
getZipDirectEntityImportStream(AtlasImportRequest request, InputStream 
inputStream) throws IOException, AtlasBaseException {
+        ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, 
request.getSizeOption());
+        LOG.info("Using ZipSourceDirect: Size: {} entities", 
zipSourceDirect.size());
+        return zipSourceDirect;
+    }
+
     @VisibleForTesting
     boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest 
importRequest, AtlasExportRequest exportRequest) {
-        if (CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
+        if (exportRequest == null || 
CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
             return false;
         }
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
index 351b475..8347b91 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
@@ -31,4 +31,8 @@ public enum ZipExportFileNames {
     public String toString() {
         return this.name;
     }
+
+    public String toEntryFileName() {
+        return this.name + ".json";
+    }
 }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
new file mode 100644
index 0000000..cb5a7ac
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.utils.AtlasJson;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import static org.apache.atlas.AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP;
+
+public class ZipSourceDirect implements EntityImportStream {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZipSourceDirect.class);
+    private static final String ZIP_ENTRY_ENTITIES = "entities.json";
+
+    private final ZipInputStream zipInputStream;
+    private int currentPosition;
+
+    private ImportTransforms importTransform;
+    private List<BaseEntityHandler> entityHandlers;
+    private AtlasTypesDef typesDef;
+    private int streamSize = 1;
+
+    EntitiesArrayParser entitiesArrayParser;
+
+    public ZipSourceDirect(InputStream inputStream, int streamSize) throws 
IOException, AtlasBaseException {
+        this.zipInputStream = new ZipInputStream(inputStream);
+        this.streamSize = streamSize;
+        prepareStreamForFetch();
+    }
+
+    @Override
+    public ImportTransforms getImportTransform() {
+        return this.importTransform;
+    }
+
+    @Override
+    public void setImportTransform(ImportTransforms importTransform) {
+        this.importTransform = importTransform;
+    }
+
+    @Override
+    public List<BaseEntityHandler> getEntityHandlers() {
+        return entityHandlers;
+    }
+
+    @Override
+    public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
+        this.entityHandlers = entityHandlers;
+    }
+
+    @Override
+    public AtlasTypesDef getTypesDef() throws AtlasBaseException {
+        return this.typesDef;
+    }
+
+    @Override
+    public AtlasExportResult getExportResult() throws AtlasBaseException {
+        return new AtlasExportResult();
+    }
+
+    @Override
+    public List<String> getCreationOrder() {
+        return new ArrayList<>();
+    }
+
+    @Override
+    public int getPosition() {
+        return currentPosition;
+    }
+
+    @Override
+    public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String 
json) throws AtlasBaseException {
+        if (StringUtils.isEmpty(json)) {
+            return null;
+        }
+
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = 
convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, json);
+
+        if (importTransform != null) {
+            entityWithExtInfo = importTransform.apply(entityWithExtInfo);
+        }
+
+        if (entityHandlers != null) {
+            applyTransformers(entityWithExtInfo);
+        }
+
+        return entityWithExtInfo;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return (this.entitiesArrayParser != null && 
entitiesArrayParser.hasNext());
+    }
+
+    @Override
+    public AtlasEntity next() {
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = 
getNextEntityWithExtInfo();
+
+        return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : 
null;
+    }
+
+    @Override
+    public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+        try {
+            if (hasNext()) {
+                String json = moveNext();
+                return getEntityWithExtInfo(json);
+            }
+        } catch (AtlasBaseException e) {
+            LOG.error("getNextEntityWithExtInfo", e);
+        }
+        return null;
+    }
+
+    @Override
+    public void reset() {
+        currentPosition = 0;
+    }
+
+    @Override
+    public AtlasEntity getByGuid(String guid) {
+        try {
+            return getEntity(guid);
+        } catch (AtlasBaseException e) {
+            LOG.error("getByGuid: {} failed!", guid, e);
+            return null;
+        }
+    }
+
+    @Override
+    public void onImportComplete(String guid) {
+    }
+
+    @Override
+    public void setPosition(int index) {
+        try {
+            for (int i = 0; i < index; i++) {
+                moveNextEntry();
+            }
+        } catch (IOException e) {
+            LOG.error("Error setting position: {}. Position may be beyond the 
stream size.", index);
+        }
+    }
+
+    @Override
+    public void setPositionUsingEntityGuid(String guid) {
+    }
+
+    @Override
+    public void close() {
+        if (this.entitiesArrayParser != null) {
+            this.entitiesArrayParser.close();
+        }
+    }
+
+    private void applyTransformers(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo) {
+        if (entityWithExtInfo == null) {
+            return;
+        }
+
+        transform(entityWithExtInfo.getEntity());
+
+        if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
+            for (AtlasEntity e : 
entityWithExtInfo.getReferredEntities().values()) {
+                transform(e);
+            }
+        }
+    }
+
+    private void transform(AtlasEntity e) {
+        for (BaseEntityHandler handler : entityHandlers) {
+            handler.transform(e);
+        }
+    }
+
+    private <T> T convertFromJson(Class<T> clazz, String jsonData) throws 
AtlasBaseException {
+        try {
+            return AtlasType.fromJson(jsonData, clazz);
+
+        } catch (Exception e) {
+            throw new AtlasBaseException("Error converting file to JSON.", e);
+        }
+    }
+
+    private AtlasEntity getEntity(String guid) throws AtlasBaseException {
+        AtlasEntity.AtlasEntityWithExtInfo extInfo = 
getEntityWithExtInfo(guid);
+        return (extInfo != null) ? extInfo.getEntity() : null;
+    }
+
+    public int size() {
+        return this.streamSize;
+    }
+
+    private String moveNext() {
+        try {
+            moveNextEntry();
+            return entitiesArrayParser.next();
+        } catch (IOException e) {
+            LOG.error("moveNext failed!", e);
+        }
+
+        return null;
+    }
+
+    private void moveNextEntry() throws IOException {
+        this.currentPosition++;
+    }
+
+    private void prepareStreamForFetch() throws AtlasBaseException, 
IOException {
+        ZipEntry zipEntryNext = zipInputStream.getNextEntry();
+        if (zipEntryNext == null) {
+            throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, 
"Attempting to import empty ZIP.");
+        }
+
+        if 
(zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toEntryFileName()))
 {
+            String json = 
getJsonPayloadFromZipEntryStream(this.zipInputStream);
+            this.typesDef = AtlasType.fromJson(json, AtlasTypesDef.class);
+        }
+
+        zipEntryNext = zipInputStream.getNextEntry();
+        if (zipEntryNext.getName().equals(ZIP_ENTRY_ENTITIES)) {
+            this.entitiesArrayParser = new EntitiesArrayParser(zipInputStream);
+        } else {
+            throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, 
"Attempting to import empty ZIP. " + ZIP_ENTRY_ENTITIES + " could not be 
found!");
+        }
+    }
+
+    private String getJsonPayloadFromZipEntryStream(ZipInputStream 
zipInputStream) {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try {
+            IOUtils.copy(zipInputStream, bos);
+        } catch (IOException e) {
+            LOG.error("Streaming copying failed!", e);
+            return null;
+        }
+        return bos.toString();
+    }
+
+    static class EntitiesArrayParser {
+        private static final String EMPTY_OBJECT = "{}";
+
+        private final JsonFactory factory;
+        private final JsonParser parser;
+        private boolean hasNext;
+
+        public EntitiesArrayParser(InputStream inputStream) throws IOException 
{
+            this.factory = AtlasJson.getMapper().getFactory();
+            this.parser = factory.createParser(inputStream);
+
+            parseNext();
+        }
+
+        public String next() throws IOException {
+            JsonToken jsonToken = parseNext();
+            if (!hasNext) {
+                return null;
+            }
+
+            if (jsonToken != null && jsonToken == JsonToken.START_OBJECT) {
+                JsonNode node = parser.readValueAsTree();
+                return validate(node.toString());
+            }
+            return null;
+
+        }
+
+        private JsonToken parseNext() throws IOException {
+            JsonToken jsonToken = this.parser.nextToken();
+            hasNext = (jsonToken != null) && (jsonToken != 
JsonToken.END_ARRAY);
+            return jsonToken;
+        }
+
+        private String validate(String payload) {
+            if (payload.equals(EMPTY_OBJECT)) {
+                hasNext = false;
+                close();
+                return null;
+            }
+
+            return payload;
+        }
+
+        public boolean hasNext() {
+            return hasNext;
+        }
+
+        public void close() {
+            try {
+                this.parser.close();
+            } catch (IOException e) {
+                LOG.error("Error closing parser!", e);
+            }
+        }
+    }
+}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
 
b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index ca0bc41..f552525 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -84,10 +84,8 @@ public class ZipFileMigrationImporter implements Runnable {
     }
 
     private AtlasImportRequest getImportRequest() throws AtlasException {
-        return new AtlasImportRequest();
-    }
-
-    private String getPropertyValue(String property, String defaultValue) 
throws AtlasException {
-        return ApplicationProperties.get().getString(property, defaultValue);
+        AtlasImportRequest request = new AtlasImportRequest();
+        request.setOption(AtlasImportRequest.OPTION_KEY_FORMAT, 
AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT);
+        return request;
     }
 }
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index c14850f..116ffa7 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -186,6 +186,11 @@ public class ImportServiceTest extends 
ExportImportTestBase {
         return getZipSource("salesNewTypeAttrs-next.zip");
     }
 
+    @DataProvider(name = "zip-direct-3")
+    public static Object[][] getZipDirect3(ITestContext context) throws 
IOException, AtlasBaseException {
+        return getZipSource("zip-direct-3.zip");
+    }
+
     @Test(dataProvider = "salesNewTypeAttrs-next", dependsOnMethods = 
"importDB4")
     public void importDB5(InputStream inputStream) throws AtlasBaseException, 
IOException {
         final String newEnumDefName = "database_action";
@@ -346,6 +351,16 @@ public class ImportServiceTest extends 
ExportImportTestBase {
         }
     }
 
+    @Test(dataProvider = "zip-direct-3", expectedExceptions = 
AtlasBaseException.class)
+    public void zipDirectSample(InputStream inputStream) throws IOException, 
AtlasBaseException {
+        loadBaseModel();
+        loadFsModel();
+
+        AtlasImportRequest request = new AtlasImportRequest();
+        request.setOption(AtlasImportRequest.OPTION_KEY_FORMAT, 
AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT);
+        runImportWithParameters(importService, request, inputStream);
+    }
+
     @DataProvider(name = "relationshipLineage")
     public static Object[][] getImportWithRelationships(ITestContext context) 
throws IOException, AtlasBaseException {
         return getZipSource("rel-lineage.zip");
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java
new file mode 100644
index 0000000..faa31c3
--- /dev/null
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+public class ZipDirectTest {
+    @Test(expectedExceptions = AtlasBaseException.class)
+    public void loadFileEmpty() throws IOException, AtlasBaseException {
+        InputStream inputStream = 
ZipFileResourceTestUtils.getFileInputStream("zip-direct-1.zip");
+        new ZipSourceDirect(inputStream, 1);
+    }
+
+    @Test
+    public void loadFile() throws IOException, AtlasBaseException {
+        final int EXPECTED_ENTITY_COUNT = 3;
+
+        InputStream inputStream = 
ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip");
+        ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, 
EXPECTED_ENTITY_COUNT);
+
+        assertNotNull(zipSourceDirect);
+        assertNotNull(zipSourceDirect.getTypesDef());
+        assertTrue(zipSourceDirect.getTypesDef().getEntityDefs().size() > 0);
+        assertNotNull(zipSourceDirect.getExportResult());
+
+        int count = 0;
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
+        while((entityWithExtInfo = zipSourceDirect.getNextEntityWithExtInfo()) 
!= null) {
+            assertNotNull(entityWithExtInfo);
+            count++;
+        }
+
+        assertEquals(count, EXPECTED_ENTITY_COUNT);
+    }
+
+    @Test
+    public void entitiesParserTest() throws IOException {
+        String object1 = "{\"type\":\"hdfs_path\"}";
+        String object2 = "{\"type\":\"hive_db\"}";
+        String entities = "[" + object1 + "," + object2 + ",{}]";
+        InputStream inputStream = new 
ByteArrayInputStream(entities.getBytes());
+        ZipSourceDirect.EntitiesArrayParser entitiesArrayParser = new 
ZipSourceDirect.EntitiesArrayParser(inputStream);
+
+        Object o = entitiesArrayParser.next();
+
+        assertNotNull(o);
+        assertEquals(o, object1);
+
+        o = entitiesArrayParser.next();
+        assertEquals(o, object2);
+
+        o = entitiesArrayParser.next();
+        assertNull(o);
+    }
+}

Reply via email to