This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit bc0f7e04ce2dfe8fc7c89e0f80d2d98bf04c83dc Author: Hussain Towaileb <[email protected]> AuthorDate: Mon Mar 30 16:10:30 2020 +0300 [ASTERIXDB-2697]: Implementing AWS s3 as external data source - user model changes: yes - storage format changes: no - interface changes: no Details: - Added an external reader for AWS S3. - Updated query translator to include the WITH parameters into the dataset details when creating an external dataset. - Added test case for AWS S3 using an S3 mocking server to avoid using real credentials. Change-Id: I71d89116c0bb404c9621b16f21a6a31cbf7bb7f6 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5025 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Reviewed-by: Dmitry Lychagin <[email protected]> --- asterixdb/asterix-app/pom.xml | 33 +++ .../asterix/app/external/ExternalLibraryUtils.java | 2 +- .../asterix/app/translator/QueryTranslator.java | 22 ++ .../asterix/hyracks/bootstrap/CCApplication.java | 10 +- .../org/apache/asterix/utils/FeedOperations.java | 12 +- .../aws/AwsS3ExternalDatasetTest.java | 215 ++++++++++++++++++ .../resources/runtimets/only_external_dataset.xml | 23 ++ .../aws/s3/000/external_dataset.000.ddl.sqlpp} | 23 +- .../aws/s3/000/external_dataset.001.query.sqlpp} | 10 +- .../aws/s3/000/external_dataset.002.ddl.sqlpp} | 9 +- .../aws/s3/000/external_dataset.001.adm | 14 ++ .../results/feeds/feeds_01/feeds_01.1.adm | 2 +- .../runtimets/testsuite_external_dataset.xml | 28 +++ .../common/dataflow/ICcApplicationContext.java | 16 ++ .../asterix/common/external}/IAdapterFactory.java | 31 +-- .../common/external/IAdapterFactoryService.java} | 15 +- .../common/external}/IDataSourceAdapter.java | 2 +- asterixdb/asterix-external-data/pom.xml | 12 + .../factory/AdapterFactoryService.java} | 18 +- .../adapter/factory/GenericAdapterFactory.java | 6 +- .../external/api/IIndexingAdapterFactory.java | 2 +- .../asterix/external/api/ITypedAdapterFactory.java | 54 +++++ .../external/dataset/adapter/FeedAdapter.java | 2 +- .../external/dataset/adapter/GenericAdapter.java | 2 +- .../input/record/reader/aws/AwsS3InputStream.java | 163 ++++++++++++++ .../record/reader/aws/AwsS3InputStreamFactory.java | 250 +++++++++++++++++++++ .../record/reader/aws/AwsS3ReaderFactory.java | 90 ++++++++ .../operators/ExternalScanOperatorDescriptor.java | 8 +- .../operators/FeedIntakeOperatorDescriptor.java | 16 +- .../operators/FeedIntakeOperatorNodePushable.java | 4 +- .../external/provider/AdapterFactoryProvider.java | 9 +- .../provider/DatasourceFactoryProvider.java | 1 + .../util/ExternalDataCompatibilityUtils.java | 7 +- .../external/util/ExternalDataConstants.java | 23 ++ ...pache.asterix.external.api.IRecordReaderFactory | 1 + .../library/adapter/TestTypedAdapterFactory.java | 6 +- .../asterix/lang/common/statement/DatasetDecl.java | 4 +- .../common/util/DatasetDeclParametersUtil.java | 18 +- .../metadata/bootstrap/MetadataBootstrap.java | 7 +- .../metadata/declared/DatasetDataSource.java | 4 +- .../metadata/declared/LoadableDataSource.java | 4 +- .../metadata/declared/MetadataProvider.java | 18 +- .../metadata/entities/DatasourceAdapter.java | 2 +- .../DatasourceAdapterTupleTranslator.java | 2 +- .../asterix/metadata/feeds/FeedMetadataUtil.java | 22 +- .../metadata/utils/ExternalIndexingOperations.java | 4 +- .../runtime/utils/CcApplicationContext.java | 12 +- asterixdb/asterix-server/pom.xml | 6 + asterixdb/pom.xml | 79 +++++++ ...streams_reactive-streams-jvm_v1.0.2_COPYING.txt | 121 ++++++++++ ...streams_reactive-streams-jvm_v1.0.2_LICENSE.txt | 8 + 51 files changed, 1313 insertions(+), 139 deletions(-) diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml index ac2c303..219595b 100644 --- a/asterixdb/asterix-app/pom.xml +++ b/asterixdb/asterix-app/pom.xml @@ -699,5 +699,38 @@ </exclusion> </exclusions> </dependency> + <!-- AWS --> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>sdk-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>s3</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>regions</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>auth</artifactId> + <scope>test</scope> + </dependency> + <!-- Mock for AWS S3 --> + <dependency> + <groupId>io.findify</groupId> + <artifactId>s3mock_2.12</artifactId> + <scope>test</scope> + </dependency> + <!-- Needed for the s3 mock --> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-http-core_2.12</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java index a989941..c185340 100755 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java @@ -34,9 +34,9 @@ import javax.xml.bind.Unmarshaller; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.external.IDataSourceAdapter; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.asterix.external.dataset.adapter.AdapterIdentifier; import org.apache.asterix.external.library.ExternalLibrary; import org.apache.asterix.external.library.LibraryAdapter; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index ce1a354..b8a048d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -155,6 +155,8 @@ import org.apache.asterix.metadata.utils.KeyFieldTypeUtil; import org.apache.asterix.metadata.utils.MetadataConstants; import org.apache.asterix.metadata.utils.MetadataLockUtil; import org.apache.asterix.metadata.utils.MetadataUtil; +import org.apache.asterix.object.base.AdmObjectNode; +import org.apache.asterix.object.base.AdmStringNode; import org.apache.asterix.om.base.IAObject; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; @@ -646,9 +648,16 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen keySourceIndicators, partitioningTypes, autogenerated, filterField); break; case EXTERNAL: + validateExternalDatasetRequirements(appCtx, metadataProvider, mdTxnCtx, dd); String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter(); Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties(); + // Add the withObjectNode items to the external dataset properties + if (!dd.getWithObjectNode().isEmpty()) { + AdmObjectNode withObjectNode = dd.getWithObjectNode(); + dd.getWithObjectNode().getFieldNames().iterator().forEachRemaining(fieldName -> properties + .put(fieldName, ((AdmStringNode) withObjectNode.get(fieldName)).get())); + } datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(), TransactionState.COMMIT); break; @@ -3051,4 +3060,17 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId()); } } + + /** + * Performs any required validation before creating an external dataset + * + * @param appContext {@link ICcApplicationContext} context + * @param metadataProvider {@link MetadataProvider} metadata provider + * @param mdTxnCtx {@link MetadataTransactionContext} metadata transaction context + * @param datasetDecl {@link DatasetDecl} dataset declaration statement + */ + protected void validateExternalDatasetRequirements(ICcApplicationContext appContext, + MetadataProvider metadataProvider, MetadataTransactionContext mdTxnCtx, DatasetDecl datasetDecl) + throws Exception { + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 26c092f..fc912b0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -69,9 +69,11 @@ import org.apache.asterix.common.config.PropertiesAccessor; import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.external.IAdapterFactoryService; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.utils.Servlets; +import org.apache.asterix.external.adapter.factory.AdapterFactoryService; import org.apache.asterix.external.library.ExternalLibraryManager; import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.messaging.CCMessageBroker; @@ -154,7 +156,7 @@ public class CCApplication extends BaseCCApplication { ccExtensionManager = new CCExtensionManager(new ArrayList<>(getExtensions())); IGlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager(); appCtx = createApplicationContext(libraryManager, globalRecoveryManager, lifecycleCoordinator, - () -> new Receptionist("CC"), ConfigValidator::new, ccExtensionManager); + () -> new Receptionist("CC"), ConfigValidator::new, ccExtensionManager, new AdapterFactoryService()); final CCConfig ccConfig = controllerService.getCCConfig(); if (System.getProperty("java.rmi.server.hostname") == null) { System.setProperty("java.rmi.server.hostname", ccConfig.getClusterPublicAddress()); @@ -182,10 +184,12 @@ public class CCApplication extends BaseCCApplication { protected ICcApplicationContext createApplicationContext(ILibraryManager libraryManager, IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator lifecycleCoordinator, IReceptionistFactory receptionistFactory, IConfigValidatorFactory configValidatorFactory, - CCExtensionManager ccExtensionManager) throws AlgebricksException, IOException { + CCExtensionManager ccExtensionManager, IAdapterFactoryService adapterFactoryService) + throws AlgebricksException, IOException { return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE, globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider, - new MetadataLockManager(), receptionistFactory, configValidatorFactory, ccExtensionManager); + new MetadataLockManager(), receptionistFactory, configValidatorFactory, ccExtensionManager, + adapterFactoryService); } protected IGlobalRecoveryManager createGlobalRecoveryManager() throws Exception { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java index b74f4c6..fc64d99 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -42,7 +42,7 @@ import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.compiler.provider.SqlppCompilationProvider; -import org.apache.asterix.external.api.IAdapterFactory; +import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.external.feed.watch.FeedActivityDetails; @@ -135,14 +135,14 @@ public class FeedOperations { private FeedOperations() { } - private static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed feed, + private static Pair<JobSpecification, ITypedAdapterFactory> buildFeedIntakeJobSpec(Feed feed, MetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception { JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); spec.setFrameSize(metadataProvider.getApplicationContext().getCompilerProperties().getFrameSize()); - IAdapterFactory adapterFactory; + ITypedAdapterFactory adapterFactory; IOperatorDescriptor feedIngestor; AlgebricksPartitionConstraint ingesterPc; - Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t = + Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> t = metadataProvider.buildFeedIntakeRuntime(spec, feed, policyAccessor); feedIngestor = t.first; ingesterPc = t.second; @@ -447,13 +447,13 @@ public class FeedOperations { MetadataProvider metadataProvider, Feed feed, List<FeedConnection> feedConnections, IStatementExecutor statementExecutor, IHyracksClientConnection hcc) throws Exception { FeedPolicyAccessor fpa = new FeedPolicyAccessor(new HashMap<>()); - Pair<JobSpecification, IAdapterFactory> intakeInfo = buildFeedIntakeJobSpec(feed, metadataProvider, fpa); + Pair<JobSpecification, ITypedAdapterFactory> intakeInfo = buildFeedIntakeJobSpec(feed, metadataProvider, fpa); List<JobSpecification> jobsList = new ArrayList<>(); // TODO: Figure out a better way to handle insert/upsert per conn instead of per feed Boolean insertFeed = ExternalDataUtils.isInsertFeed(feed.getConfiguration()); // Construct the ingestion Job JobSpecification intakeJob = intakeInfo.getLeft(); - IAdapterFactory ingestionAdaptorFactory = intakeInfo.getRight(); + ITypedAdapterFactory ingestionAdaptorFactory = intakeInfo.getRight(); String[] ingestionLocations = ingestionAdaptorFactory.getPartitionConstraint().getLocations(); // Add metadata configs metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, Boolean.TRUE.toString()); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java new file mode 100644 index 0000000..3b4cdf8 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.external_dataset.aws; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.test.common.TestExecutor; +import org.apache.asterix.test.runtime.ExecutionTestUtil; +import org.apache.asterix.test.runtime.LangExecutionUtil; +import org.apache.asterix.testframework.context.TestCaseContext; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import io.findify.s3mock.S3Mock; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +/** + * Runs an AWS S3 mock server and test it as an external dataset + */ +@RunWith(Parameterized.class) +public class AwsS3ExternalDatasetTest { + + private static final Logger LOGGER = LogManager.getLogger(); + + protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf"; + + // S3 mock server + private static S3Mock s3MockServer; + + // IMPORTANT: The following values must be used in the AWS S3 test case + private static S3Client client; + private static final String S3_MOCK_SERVER_BUCKET = "playground"; + private static final String S3_MOCK_SERVER_BUCKET_DEFINITION = "json-data/reviews/"; // data resides here + private static final String S3_MOCK_SERVER_REGION = "us-west-2"; + private static final int S3_MOCK_SERVER_PORT = 8001; + private static final String S3_MOCK_SERVER_HOSTNAME = "http://localhost:" + S3_MOCK_SERVER_PORT; + + @BeforeClass + public static void setUp() throws Exception { + final TestExecutor testExecutor = new TestExecutor(); + LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor); + setNcEndpoints(testExecutor); + startAwsS3MockServer(); + } + + @AfterClass + public static void tearDown() throws Exception { + LangExecutionUtil.tearDown(); + + // Shutting down S3 mock server + LOGGER.info("Shutting down S3 mock server and client"); + if (client != null) { + client.close(); + } + if (s3MockServer != null) { + s3MockServer.shutdown(); + } + LOGGER.info("S3 mock down and client shut down successfully"); + } + + @Parameters(name = "SqlppExecutionTest {index}: {0}") + public static Collection<Object[]> tests() throws Exception { + return LangExecutionUtil.tests("only_external_dataset.xml", "testsuite_external_dataset.xml"); + } + + protected TestCaseContext tcCtx; + + public AwsS3ExternalDatasetTest(TestCaseContext tcCtx) { + this.tcCtx = tcCtx; + } + + @Test + public void test() throws Exception { + LangExecutionUtil.test(tcCtx); + } + + private static void setNcEndpoints(TestExecutor testExecutor) { + final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs; + final Map<String, InetSocketAddress> ncEndPoints = new HashMap<>(); + final String ip = InetAddress.getLoopbackAddress().getHostAddress(); + for (NodeControllerService nc : ncs) { + final String nodeId = nc.getId(); + final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext(); + int apiPort = appCtx.getExternalProperties().getNcApiPort(); + ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort)); + } + testExecutor.setNcEndPoints(ncEndPoints); + } + + /** + * Starts the AWS s3 mocking server and loads some files for testing + */ + private static void startAwsS3MockServer() { + // Starting S3 mock server to be used instead of real S3 server + LOGGER.info("Starting S3 mock server"); + s3MockServer = new S3Mock.Builder().withPort(S3_MOCK_SERVER_PORT).withInMemoryBackend().build(); + s3MockServer.start(); + LOGGER.info("S3 mock server started successfully"); + + // Create a client and add some files to the S3 mock server + LOGGER.info("Creating S3 client to load initial files to S3 mock server"); + S3ClientBuilder builder = S3Client.builder(); + URI endpoint = URI.create(S3_MOCK_SERVER_HOSTNAME); // endpoint pointing to S3 mock server + builder.region(Region.of(S3_MOCK_SERVER_REGION)).credentialsProvider(AnonymousCredentialsProvider.create()) + .endpointOverride(endpoint); + client = builder.build(); + LOGGER.info("Client created successfully"); + + // Create the bucket and upload some json files + prepareS3Bucket(); + } + + /** + * Creates a bucket and fills it with some files for testing purpose. + */ + private static void prepareS3Bucket() { + LOGGER.info("creating bucket " + S3_MOCK_SERVER_BUCKET); + client.createBucket(CreateBucketRequest.builder().bucket(S3_MOCK_SERVER_BUCKET).build()); + LOGGER.info("bucket created successfully"); + + LOGGER.info("Adding JSON files to the bucket"); + client.putObject( + PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET) + .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "0.json").build(), + RequestBody.fromString("{\"id\": 1, \"year\": null, \"quarter\": null, \"review\": \"good\"}")); + client.putObject( + PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET) + .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "1.json").build(), + RequestBody.fromString("{\"id\": 2, \"year\": null, \"quarter\": null, \"review\": \"good\"}")); + client.putObject( + PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET) + .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/1.json").build(), + RequestBody.fromString("{\"id\": 3, \"year\": 2018, \"quarter\": null, \"review\": \"good\"}")); + client.putObject( + PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET) + .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/2.json").build(), + RequestBody.fromString("{\"id\": 4, \"year\": 2018, \"quarter\": null, \"review\": \"bad\"}")); + client.putObject( + PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET) + .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/q1/1.json").build(), + RequestBody.fromString("{\"id\": 5, \"year\": 2018, \"quarter\": 1, \"review\": \"good\"}")); + client.putObject( + PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET) + .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/q1/2.json").build(), + RequestBody.fromString("{\"id\": 6, \"year\": 2018, \"quarter\": 1, \"review\": \"bad\"}")); + client.putObject( + PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET) + .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/q2/1.json").build(), + RequestBody.fromString("{\"id\": 7, \"year\": 2018, \"quarter\": 2, \"review\": \"good\"}")); + client.putObject( + PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET) + .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/q2/2.json").build(), + RequestBody.fromString("{\"id\": 8, \"year\": 2018, \"quarter\": 2, \"review\": \"bad\"}")); + client.putObject( + PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET) + .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/1.json").build(), + RequestBody.fromString("{\"id\": 9, \"year\": 2019, \"quarter\": null, \"review\": \"good\"}")); + client.putObject( + PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET) + .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/2.json").build(), + RequestBody.fromString("{\"id\": 10, \"year\": 2019, \"quarter\": null, \"review\": \"bad\"}")); + client.putObject( + PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET) + .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/q1/1.json").build(), + RequestBody.fromString("{\"id\": 11, \"year\": 2019, \"quarter\": 1, \"review\": \"good\"}")); + client.putObject( + PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET) + .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/q1/2.json").build(), + RequestBody.fromString("{\"id\": 12, \"year\": 2019, \"quarter\": 1, \"review\": \"bad\"}")); + client.putObject( + PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET) + .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/q2/1.json").build(), + RequestBody.fromString("{\"id\": 13, \"year\": 2019, \"quarter\": 2, \"review\": \"good\"}")); + client.putObject( + PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET) + .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/q2/2.json").build(), + RequestBody.fromString("{\"id\": 14, \"year\": 2019, \"quarter\": 2, \"review\": \"bad\"}")); + LOGGER.info("Files added successfully"); + } +} diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/only_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/only_external_dataset.xml new file mode 100644 index 0000000..334dd52 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/only_external_dataset.xml @@ -0,0 +1,23 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + ! Licensed to the Apache Software Foundation (ASF) under one + ! or more contributor license agreements. See the NOTICE file + ! distributed with this work for additional information + ! regarding copyright ownership. The ASF licenses this file + ! to you under the Apache License, Version 2.0 (the + ! "License"); you may not use this file except in compliance + ! with the License. You may obtain a copy of the License at + ! + ! http://www.apache.org/licenses/LICENSE-2.0 + ! + ! Unless required by applicable law or agreed to in writing, + ! software distributed under the License is distributed on an + ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ! KIND, either express or implied. See the License for the + ! specific language governing permissions and limitations + ! under the License. + !--> +<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp"> + <test-group name="failed"> + </test-group> +</test-suite> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.000.ddl.sqlpp similarity index 66% copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.000.ddl.sqlpp index 37cc1cf..9c6a994 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.000.ddl.sqlpp @@ -16,12 +16,23 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; -import java.util.List; +drop dataverse test if exists; +create dataverse test; +use test; -import org.apache.asterix.external.indexing.ExternalFile; +drop type test if exists; +create type test as open { +}; + +drop dataset test if exists; +create external dataset test(test) using S3 ( +("accessKey"="dummyAccessKey"), +("secretKey"="dummySecretKey"), +("region"="us-west-2"), +("serviceEndpoint"="http://localhost:8001"), +("container"="playground"), +("definition"="json-data/reviews"), +("format"="json") +); -public interface IIndexingAdapterFactory extends IAdapterFactory { - public void setSnapshot(List<ExternalFile> files, boolean indexingOp); -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.001.query.sqlpp similarity index 75% copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.001.query.sqlpp index 37cc1cf..2dd9cc5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.001.query.sqlpp @@ -16,12 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; -import java.util.List; +use test; -import org.apache.asterix.external.indexing.ExternalFile; +from test +select value test +order by id asc; -public interface IIndexingAdapterFactory extends IAdapterFactory { - public void setSnapshot(List<ExternalFile> files, boolean indexingOp); -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.002.ddl.sqlpp similarity index 75% copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.002.ddl.sqlpp index 37cc1cf..548e632 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.002.ddl.sqlpp @@ -16,12 +16,5 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; -import java.util.List; - -import org.apache.asterix.external.indexing.ExternalFile; - -public interface IIndexingAdapterFactory extends IAdapterFactory { - public void setSnapshot(List<ExternalFile> files, boolean indexingOp); -} +drop dataverse test if exists; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/000/external_dataset.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/000/external_dataset.001.adm new file mode 100644 index 0000000..a7ce908 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/000/external_dataset.001.adm @@ -0,0 +1,14 @@ +{ "id": 1, "year": null, "quarter": null, "review": "good" } +{ "id": 2, "year": null, "quarter": null, "review": "good" } +{ "id": 3, "year": 2018, "quarter": null, "review": "good" } +{ "id": 4, "year": 2018, "quarter": null, "review": "bad" } +{ "id": 5, "year": 2018, "quarter": 1, "review": "good" } +{ "id": 6, "year": 2018, "quarter": 1, "review": "bad" } +{ "id": 7, "year": 2018, "quarter": 2, "review": "good" } +{ "id": 8, "year": 2018, "quarter": 2, "review": "bad" } +{ "id": 9, "year": 2019, "quarter": null, "review": "good" } +{ "id": 10, "year": 2019, "quarter": null, "review": "bad" } +{ "id": 11, "year": 2019, "quarter": 1, "review": "good" } +{ "id": 12, "year": 2019, "quarter": 1, "review": "bad" } +{ "id": 13, "year": 2019, "quarter": 2, "review": "good" } +{ "id": 14, "year": 2019, "quarter": 2, "review": "bad" } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm index 85cd967..1dc31dc 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm @@ -1 +1 @@ -{ "DataverseName": "feeds", "FeedName": "TweetFeed", "AdapterConfiguration": {{ { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "feed", "Value": "TweetFeed" }, { "Name": "adapter-name", "Value": "localfs" }, { "Name": "is-feed", "Value": "true" }, { "Name": "parser", "Value": "adm" }, { "Name": "reader", "Value": "localfs" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" }, { "Name": "type-name", "Value": "TweetType" [...] +{ "DataverseName": "feeds", "FeedName": "TweetFeed", "AdapterConfiguration": {{ { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "feed", "Value": "TweetFeed" }, { "Name": "adapter-name", "Value": "localfs" }, { "Name": "is-feed", "Value": "true" }, { "Name": "parser", "Value": "adm" }, { "Name": "reader", "Value": "localfs" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" }, { "Name": "linkName", "Value": "localfs" }, [...] \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml new file mode 100644 index 0000000..cd1fb12 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + ! Licensed to the Apache Software Foundation (ASF) under one + ! or more contributor license agreements. See the NOTICE file + ! distributed with this work for additional information + ! regarding copyright ownership. The ASF licenses this file + ! to you under the Apache License, Version 2.0 (the + ! "License"); you may not use this file except in compliance + ! with the License. You may obtain a copy of the License at + ! + ! http://www.apache.org/licenses/LICENSE-2.0 + ! + ! Unless required by applicable law or agreed to in writing, + ! software distributed under the License is distributed on an + ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ! KIND, either express or implied. See the License for the + ! specific language governing permissions and limitations + ! under the License. + !--> +<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp"> + <test-group name="external-dataset"> + <test-case FilePath="external-dataset"> + <compilation-unit name="aws/s3/000"> + <output-dir compare="Text">aws/s3/000</output-dir> + </compilation-unit> + </test-case> + </test-group> +</test-suite> diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java index 3389962..5fc1bb7 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.dataflow; import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.api.ICoordinationService; import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.api.INodeJobTracker; import org.apache.asterix.common.api.IRequestTracker; @@ -26,6 +27,7 @@ import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.ExtensionProperties; import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.external.IAdapterFactoryService; import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.storage.ICompressionManager; @@ -127,4 +129,18 @@ public interface ICcApplicationContext extends IApplicationContext { * @return the request tracker. */ IRequestTracker getRequestTracker(); + + /** + * Gets the coordination service + * + * @return the coordination service + */ + ICoordinationService getCoordinationService(); + + /** + * Gets the adapter factory service + * + * @return the adapter factory service + */ + IAdapterFactoryService getAdapterFactoryService(); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java similarity index 82% rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java index 40bc7d8..e2e7e3e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java @@ -16,12 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; +package org.apache.asterix.common.external; import java.io.Serializable; import java.util.Map; -import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; @@ -60,8 +59,8 @@ public interface IAdapterFactory extends Serializable { /** * Creates an instance of IDatasourceAdapter. * - * @param HyracksTaskContext - * @param partition + * @param ctx HyracksTaskContext + * @param partition partition number * @return An instance of IDatasourceAdapter. * @throws Exception */ @@ -77,28 +76,4 @@ public interface IAdapterFactory extends Serializable { */ void configure(IServiceContext serviceContext, Map<String, String> configuration) throws HyracksDataException, AlgebricksException; - - /** - * Set the expected record output type of the adapter - * - * @param outputType - */ - void setOutputType(ARecordType outputType); - - /** - * Set the expected meta output type of the adapter - * - * @param metaType - */ - void setMetaType(ARecordType metaType); - - /** - * @return the adapter record output type - */ - ARecordType getOutputType(); - - /** - * @return the adapter meta output type - */ - ARecordType getMetaType(); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactoryService.java similarity index 75% copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java copy to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactoryService.java index 37cc1cf..55e25b7 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactoryService.java @@ -16,12 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; +package org.apache.asterix.common.external; -import java.util.List; +@FunctionalInterface +public interface IAdapterFactoryService { -import org.apache.asterix.external.indexing.ExternalFile; - -public interface IIndexingAdapterFactory extends IAdapterFactory { - public void setSnapshot(List<ExternalFile> files, boolean indexingOp); + /** + * Creates and returns and adapter factory + * + * @return adapter factory + */ + IAdapterFactory createAdapterFactory(); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java similarity index 97% rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java index 472cdae..18f59f2 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; +package org.apache.asterix.common.external; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index 605fbe5..30e7770 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -435,5 +435,17 @@ <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>s3</artifactId> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>regions</artifactId> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>auth</artifactId> + </dependency> </dependencies> </project> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/AdapterFactoryService.java similarity index 63% copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/AdapterFactoryService.java index 37cc1cf..aaf2002 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/AdapterFactoryService.java @@ -16,12 +16,20 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.api; +package org.apache.asterix.external.adapter.factory; -import java.util.List; +import org.apache.asterix.common.external.IAdapterFactoryService; +import org.apache.asterix.external.api.ITypedAdapterFactory; -import org.apache.asterix.external.indexing.ExternalFile; +public class AdapterFactoryService implements IAdapterFactoryService { -public interface IIndexingAdapterFactory extends IAdapterFactory { - public void setSnapshot(List<ExternalFile> files, boolean indexingOp); + /** + * Creates and returns an adapter factory + * + * @return adaptor factory + */ + @Override + public ITypedAdapterFactory createAdapterFactory() { + return new GenericAdapterFactory(); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java index fc59f68..d081e56 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java @@ -25,14 +25,14 @@ import java.util.Map; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.external.IDataSourceAdapter; import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IDataFlowController; import org.apache.asterix.external.api.IDataParserFactory; -import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.asterix.external.api.IExternalDataSourceFactory; import org.apache.asterix.external.api.IIndexibleExternalDataSource; import org.apache.asterix.external.api.IIndexingAdapterFactory; +import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.dataset.adapter.FeedAdapter; import org.apache.asterix.external.dataset.adapter.GenericAdapter; @@ -59,7 +59,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterFactory { +public class GenericAdapterFactory implements IIndexingAdapterFactory, ITypedAdapterFactory { private static final long serialVersionUID = 1L; private static final Logger LOGGER = LogManager.getLogger(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java index 37cc1cf..8d42046 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java @@ -22,6 +22,6 @@ import java.util.List; import org.apache.asterix.external.indexing.ExternalFile; -public interface IIndexingAdapterFactory extends IAdapterFactory { +public interface IIndexingAdapterFactory extends ITypedAdapterFactory { public void setSnapshot(List<ExternalFile> files, boolean indexingOp); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITypedAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITypedAdapterFactory.java new file mode 100644 index 0000000..13e3b34 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITypedAdapterFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.api; + +import org.apache.asterix.common.external.IAdapterFactory; +import org.apache.asterix.om.types.ARecordType; + +/** + * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory. + * Acts as a marker interface indicating that the implementation provides functionality + * for creating an adapter. + */ +public interface ITypedAdapterFactory extends IAdapterFactory { + + /** + * Set the expected record output type of the adapter + * + * @param outputType + */ + void setOutputType(ARecordType outputType); + + /** + * Set the expected meta output type of the adapter + * + * @param metaType + */ + void setMetaType(ARecordType metaType); + + /** + * @return the adapter record output type + */ + ARecordType getOutputType(); + + /** + * @return the adapter meta output type + */ + ARecordType getMetaType(); +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java index 2a92d40..0ab59fe 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java @@ -21,7 +21,7 @@ package org.apache.asterix.external.dataset.adapter; import java.io.Closeable; import java.io.IOException; -import org.apache.asterix.external.api.IDataSourceAdapter; +import org.apache.asterix.common.external.IDataSourceAdapter; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java index 916fe0a..0904384 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.external.dataset.adapter; +import org.apache.asterix.common.external.IDataSourceAdapter; import org.apache.asterix.external.api.IDataFlowController; -import org.apache.asterix.external.api.IDataSourceAdapter; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java new file mode 100644 index 0000000..cfa1f6a --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws; + +import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3Constants; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.external.api.AsterixInputStream; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.CleanupUtils; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; + +public class AwsS3InputStream extends AsterixInputStream { + + // Configuration + private final Map<String, String> configuration; + + private final S3Client s3Client; + + // File fields + private final List<String> filePaths; + private int nextFileIndex = 0; + + // File reading fields + private InputStream inputStream; + + public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) { + this.configuration = configuration; + this.filePaths = filePaths; + + this.s3Client = buildAwsS3Client(configuration); + } + + @Override + public int read() throws IOException { + throw new HyracksDataException( + "read() is not supported with this stream. use read(byte[] b, int off, int len)"); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (inputStream == null) { + if (!advance()) { + return -1; + } + } + + int result = inputStream.read(b, off, len); + + // If file reading is done, go to the next file, or finish up if no files are left + if (result < 0) { + if (advance()) { + result = inputStream.read(b, off, len); + } else { + return -1; + } + } + + return result; + } + + private boolean advance() throws IOException { + // No files to read for this partition + if (filePaths == null || filePaths.isEmpty()) { + return false; + } + + // Finished reading all the files + if (nextFileIndex == filePaths.size()) { + if (inputStream != null) { + inputStream.close(); + } + return false; + } + + // Close the current stream before going to the next one + if (inputStream != null) { + inputStream.close(); + } + + String bucket = configuration.get(AwsS3Constants.CONTAINER_NAME_FIELD_NAME); + GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder(); + GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build(); + inputStream = s3Client.getObject(getObjectRequest); + + // Current file ready, point to the next file + nextFileIndex++; + return true; + } + + @Override + public boolean stop() { + return false; + } + + @Override + public boolean handleException(Throwable th) { + return false; + } + + @Override + public void close() throws IOException { + if (inputStream != null) { + CleanupUtils.close(inputStream, null); + } + } + + /** + * Prepares and builds the Amazon S3 client with the provided configuration + * + * @param configuration S3 client configuration + * + * @return Amazon S3 client + */ + private static S3Client buildAwsS3Client(Map<String, String> configuration) { + S3ClientBuilder builder = S3Client.builder(); + + // Credentials + String accessKey = configuration.get(AwsS3Constants.ACCESS_KEY_FIELD_NAME); + String secretKey = configuration.get(AwsS3Constants.SECRET_KEY_FIELD_NAME); + AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKey, secretKey); + builder.credentialsProvider(StaticCredentialsProvider.create(credentials)); + + // Region + String region = configuration.get(AwsS3Constants.REGION_FIELD_NAME); + builder.region(Region.of(region)); + + // Use user's endpoint if provided + if (configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME) != null) { + String endPoint = configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME); + builder.endpointOverride(URI.create(endPoint)); + } + + return builder.build(); + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java new file mode 100644 index 0000000..a9f7898 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws; + +import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3Constants; + +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.api.AsterixInputStream; +import org.apache.asterix.external.api.IInputStreamFactory; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.application.IServiceContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.S3Object; + +public class AwsS3InputStreamFactory implements IInputStreamFactory { + + private static final long serialVersionUID = 1L; + private Map<String, String> configuration; + + // Files to read from + private List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>(); + + private transient AlgebricksAbsolutePartitionConstraint partitionConstraint; + + @Override + public DataSourceType getDataSourceType() { + return DataSourceType.STREAM; + } + + @Override + public boolean isIndexible() { + return false; + } + + @Override + public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) { + return new AwsS3InputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths()); + } + + @Override + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { + return partitionConstraint; + } + + @Override + public void configure(IServiceContext ctx, Map<String, String> configuration) + throws AlgebricksException, HyracksDataException { + this.configuration = configuration; + ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext(); + + String container = configuration.get(AwsS3Constants.CONTAINER_NAME_FIELD_NAME); + + S3Client s3Client = buildAwsS3Client(configuration); + + // Get all objects in a bucket and extract the paths to files + ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container); + String path = configuration.get(AwsS3Constants.DEFINITION_FIELD_NAME); + if (path != null) { + listObjectsBuilder.prefix(path + (path.endsWith("/") ? "" : "/")); + } + ListObjectsResponse listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build()); + List<S3Object> s3Objects = listObjectsResponse.contents(); + + // Exclude the directories and get the files only + String fileFormat = configuration.get(ExternalDataConstants.KEY_FORMAT); + List<S3Object> fileObjects = getFilesOnly(s3Objects, fileFormat); + + // Partition constraints + partitionConstraint = ccApplicationContext.getClusterStateManager().getClusterLocations(); + int partitionsCount = partitionConstraint.getLocations().length; + + // Distribute work load amongst the partitions + distributeWorkLoad(fileObjects, partitionsCount); + } + + /** + * AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered + * a file if it does not end up with a "/" which is the separator in a folder structure. + * + * @param s3Objects List of returned objects + * + * @return A list of string paths that point to files only + * + * @throws HyracksDataException HyracksDataException + */ + private List<S3Object> getFilesOnly(List<S3Object> s3Objects, String fileFormat) throws HyracksDataException { + List<S3Object> filesOnly = new ArrayList<>(); + String fileExtension = getFileExtension(fileFormat); + if (fileExtension == null) { + throw HyracksDataException.create(ErrorCode.INVALID_FORMAT); + } + + s3Objects.stream().filter(object -> object.key().endsWith(fileExtension)).forEach(filesOnly::add); + + return filesOnly; + } + + /** + * To efficiently utilize the parallelism, work load will be distributed amongst the partitions based on the file + * size. + * + * Example: + * File1 1mb, File2 300kb, File3 300kb, File4 300kb + * + * Distribution: + * Partition1: [File1] + * Partition2: [File2, File3, File4] + * + * @param fileObjects AWS S3 file objects + * @param partitionsCount Partitions count + */ + private void distributeWorkLoad(List<S3Object> fileObjects, int partitionsCount) { + // Prepare the workloads based on the number of partitions + for (int i = 0; i < partitionsCount; i++) { + partitionWorkLoadsBasedOnSize.add(new PartitionWorkLoadBasedOnSize()); + } + + for (S3Object object : fileObjects) { + PartitionWorkLoadBasedOnSize smallest = getSmallestWorkLoad(); + smallest.addFilePath(object.key(), object.size()); + } + } + + /** + * Finds the smallest workload and returns it + * + * @return the smallest workload + */ + private PartitionWorkLoadBasedOnSize getSmallestWorkLoad() { + PartitionWorkLoadBasedOnSize smallest = partitionWorkLoadsBasedOnSize.get(0); + for (PartitionWorkLoadBasedOnSize partition : partitionWorkLoadsBasedOnSize) { + // If the current total size is 0, add the file directly as this is a first time partition + if (partition.getTotalSize() == 0) { + smallest = partition; + break; + } + if (partition.getTotalSize() < smallest.getTotalSize()) { + smallest = partition; + } + } + + return smallest; + } + + /** + * Prepares and builds the Amazon S3 client with the provided configuration + * + * @param configuration S3 client configuration + * + * @return Amazon S3 client + */ + private static S3Client buildAwsS3Client(Map<String, String> configuration) { + S3ClientBuilder builder = S3Client.builder(); + + // Credentials + String accessKey = configuration.get(AwsS3Constants.ACCESS_KEY_FIELD_NAME); + String secretKey = configuration.get(AwsS3Constants.SECRET_KEY_FIELD_NAME); + AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKey, secretKey); + builder.credentialsProvider(StaticCredentialsProvider.create(credentials)); + + // Region + String region = configuration.get(AwsS3Constants.REGION_FIELD_NAME); + builder.region(Region.of(region)); + + // Use user's endpoint if provided + if (configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME) != null) { + String endPoint = configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME); + builder.endpointOverride(URI.create(endPoint)); + } + + return builder.build(); + } + + /** + * Returns the file extension for the provided file format. + * + * @param format file format + * + * @return file extension for the provided file format, null otherwise. + */ + private String getFileExtension(String format) { + switch (format.toLowerCase()) { + case "json": + return ".json"; + default: + return null; + } + } + + private static class PartitionWorkLoadBasedOnSize implements Serializable { + private static final long serialVersionUID = 1L; + private List<String> filePaths = new ArrayList<>(); + private long totalSize = 0; + + PartitionWorkLoadBasedOnSize() { + } + + public List<String> getFilePaths() { + return filePaths; + } + + public void addFilePath(String filePath, long size) { + this.filePaths.add(filePath); + this.totalSize += size; + } + + public long getTotalSize() { + return totalSize; + } + + @Override + public String toString() { + return "Files: " + filePaths.size() + ", Total Size: " + totalSize; + } + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java new file mode 100644 index 0000000..e78783a --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws; + +import java.lang.reflect.InvocationTargetException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.external.api.IRecordReader; +import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader; +import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory; +import org.apache.asterix.external.provider.StreamRecordReaderProvider; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.application.IServiceContext; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class AwsS3ReaderFactory extends StreamRecordReaderFactory { + + private static final long serialVersionUID = 1L; + + private static final List<String> recordReaderNames = + Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3); + + @Override + public List<String> getRecordReaderNames() { + return recordReaderNames; + } + + @Override + public DataSourceType getDataSourceType() { + return DataSourceType.RECORDS; + } + + @Override + public Class<?> getRecordClass() { + return char[].class; + } + + @Override + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException { + return streamFactory.getPartitionConstraint(); + } + + @Override + public void configure(IServiceContext ctx, Map<String, String> configuration) + throws AlgebricksException, HyracksDataException { + this.configuration = configuration; + + // Stream factory + streamFactory = new AwsS3InputStreamFactory(); + streamFactory.configure(ctx, configuration); + + // record reader + recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration); + } + + @Override + public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition) + throws HyracksDataException { + try { + StreamRecordReader streamRecordReader = + (StreamRecordReader) recordReaderClazz.getConstructor().newInstance(); + streamRecordReader.configure(streamFactory.createInputStream(ctx, partition), configuration); + return streamRecordReader; + } catch (InstantiationException | IllegalAccessException | InvocationTargetException + | NoSuchMethodException e) { + throw HyracksDataException.create(e); + } + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java index 081d49e..4fd5151 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.external.operators; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.api.IDataSourceAdapter; +import org.apache.asterix.common.external.IDataSourceAdapter; +import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; @@ -37,10 +37,10 @@ public class ExternalScanOperatorDescriptor extends AbstractSingleActivityOperat private static final long serialVersionUID = 1L; - private IAdapterFactory adapterFactory; + private ITypedAdapterFactory adapterFactory; public ExternalScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc, - IAdapterFactory dataSourceAdapterFactory) { + ITypedAdapterFactory dataSourceAdapterFactory) { super(spec, 0, 1); outRecDescs[0] = rDesc; this.adapterFactory = dataSourceAdapterFactory; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java index 7a0341a..d63e8a8 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java @@ -25,7 +25,7 @@ import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.external.api.IAdapterFactory; +import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.external.feed.api.IFeed; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.om.types.ARecordType; @@ -57,7 +57,7 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator private final FeedPolicyAccessor policyAccessor; private final ARecordType adapterOutputType; /** The adaptor factory that is used to create an instance of the feed adaptor **/ - private IAdapterFactory adaptorFactory; + private ITypedAdapterFactory adaptorFactory; /** The library that contains the adapter in use. **/ private String adaptorLibraryName; /** @@ -68,7 +68,7 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator /** The configuration parameters associated with the adapter. **/ private Map<String, String> adaptorConfiguration; - public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, IAdapterFactory adapterFactory, + public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, ITypedAdapterFactory adapterFactory, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) { super(spec, 0, 1); this.feedId = new EntityId(FEED_EXTENSION_NAME, primaryFeed.getDataverseName(), primaryFeed.getFeedName()); @@ -100,15 +100,15 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, recordDescProvider, this); } - private IAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx) throws HyracksDataException { - IAdapterFactory adapterFactory; + private ITypedAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx) throws HyracksDataException { + ITypedAdapterFactory adapterFactory; INcApplicationContext runtimeCtx = (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); ILibraryManager libraryManager = runtimeCtx.getLibraryManager(); ClassLoader classLoader = libraryManager.getLibraryClassLoader(feedId.getDataverse(), adaptorLibraryName); if (classLoader != null) { try { - adapterFactory = (IAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance()); + adapterFactory = (ITypedAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance()); adapterFactory.setOutputType(adapterOutputType); adapterFactory.configure(ctx.getJobletContext().getServiceContext(), adaptorConfiguration); } catch (Exception e) { @@ -128,11 +128,11 @@ public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperator return feedId; } - public IAdapterFactory getAdaptorFactory() { + public ITypedAdapterFactory getAdaptorFactory() { return this.adaptorFactory; } - public void setAdaptorFactory(IAdapterFactory factory) { + public void setAdaptorFactory(ITypedAdapterFactory factory) { this.adaptorFactory = factory; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java index 98f75df..7002a23 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit; import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.ActiveSourceOperatorNodePushable; import org.apache.asterix.active.EntityId; -import org.apache.asterix.external.api.IAdapterFactory; +import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.external.dataset.adapter.FeedAdapter; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.VSizeFrame; @@ -50,7 +50,7 @@ public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePush private final FeedAdapter adapter; private boolean poisoned = false; - public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, EntityId feedId, IAdapterFactory adapterFactory, + public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, EntityId feedId, ITypedAdapterFactory adapterFactory, int partition, IRecordDescriptorProvider recordDescProvider, FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) throws HyracksDataException { super(ctx, new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition)); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java index 5740143..414c460 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java @@ -21,10 +21,11 @@ package org.apache.asterix.external.provider; import java.util.List; import java.util.Map; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.adapter.factory.GenericAdapterFactory; import org.apache.asterix.external.adapter.factory.LookupAdapterFactory; -import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IIndexingAdapterFactory; +import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.util.ExternalDataCompatibilityUtils; import org.apache.asterix.om.types.ARecordType; @@ -39,11 +40,13 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class AdapterFactoryProvider { // Adapters - public static IAdapterFactory getAdapterFactory(IServiceContext serviceCtx, String adapterName, + public static ITypedAdapterFactory getAdapterFactory(IServiceContext serviceCtx, String adapterName, Map<String, String> configuration, ARecordType itemType, ARecordType metaType) throws HyracksDataException, AlgebricksException { ExternalDataCompatibilityUtils.prepare(adapterName, configuration); - GenericAdapterFactory adapterFactory = new GenericAdapterFactory(); + ICcApplicationContext context = (ICcApplicationContext) serviceCtx.getApplicationContext(); + ITypedAdapterFactory adapterFactory = + (ITypedAdapterFactory) context.getAdapterFactoryService().createAdapterFactory(); adapterFactory.setOutputType(itemType); adapterFactory.setMetaType(metaType); adapterFactory.configure(serviceCtx, configuration); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java index 8024dc4..2a2289c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java @@ -53,6 +53,7 @@ public class DatasourceFactoryProvider { public static IExternalDataSourceFactory getExternalDataSourceFactory(ILibraryManager libraryManager, Map<String, String> configuration) throws HyracksDataException, AsterixException { + // Take a copy of the configuration if (ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.RECORDS)) { String reader = configuration.get(ExternalDataConstants.KEY_READER); return DatasourceFactoryProvider.getRecordReaderFactory(libraryManager, reader, configuration); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java index e222e99..77cbb96 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java @@ -60,8 +60,11 @@ public class ExternalDataCompatibilityUtils { } public static void prepare(String adapterName, Map<String, String> configuration) { - if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) { - configuration.put(ExternalDataConstants.KEY_READER, adapterName); + // Adapter name in some cases can carry the link name for external datasets, always add it to configuration + configuration.put(ExternalDataConstants.KEY_LINK_NAME, adapterName); + + if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) { // SThree + configuration.put(ExternalDataConstants.KEY_READER, adapterName); // myAwsLink } if (!configuration.containsKey(ExternalDataConstants.KEY_PARSER)) { if (configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 729215e..e44144a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -114,6 +114,7 @@ public class ExternalDataConstants { public static final String KEY_ADAPTER_NAME_SOCKET = "socket"; public static final String KEY_ALIAS_ADAPTER_NAME_SOCKET = "socket_adapter"; public static final String KEY_ADAPTER_NAME_HTTP = "http_adapter"; + public static final String KEY_ADAPTER_NAME_AWS_S3 = "S3"; /** * HDFS class names @@ -229,4 +230,26 @@ public class ExternalDataConstants { public static final String FORMAT_CSV = "csv"; public static final String ERROR_PARSE_RECORD = "Parser failed to parse record"; + + // TODO(Hussain): Move link related items to a different place + /** + * Common external link fields + */ + public static final String KEY_DATAVERSE_NAME = "dataverseName"; + public static final String KEY_LINK_NAME = "linkName"; + public static final String KEY_LINK_TYPE = "linkType"; + public static final String[] KEY_EXTERNAL_DATASET_REQUIRED_CONNECTION_PARAMETERS = + new String[] { KEY_DATAVERSE_NAME, KEY_LINK_NAME, KEY_LINK_TYPE }; + + public static class AwsS3Constants { + public static final String REGION_FIELD_NAME = "region"; + public static final String ACCESS_KEY_FIELD_NAME = "accessKey"; + public static final String SECRET_KEY_FIELD_NAME = "secretKey"; + public static final String CONTAINER_NAME_FIELD_NAME = "container"; + public static final String DEFINITION_FIELD_NAME = "definition"; + public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint"; + public static final String[] REQUIRED_LINK_PARAMETERS = + new String[] { ACCESS_KEY_FIELD_NAME, SECRET_KEY_FIELD_NAME, REGION_FIELD_NAME }; + public static final String[] OPTIONAL_LINK_PARAMETERS = new String[] { SERVICE_END_POINT_FIELD_NAME }; + } } diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory index 0d96658..fd3e473 100644 --- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory +++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory @@ -20,3 +20,4 @@ org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory org.apache.asterix.external.input.HDFSDataSourceFactory org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory +org.apache.asterix.external.input.record.reader.aws.AwsS3ReaderFactory diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java index 8ee8a57..a947c7e 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java @@ -25,9 +25,9 @@ import java.util.Map; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.api.IDataSourceAdapter; +import org.apache.asterix.common.external.IDataSourceAdapter; import org.apache.asterix.external.api.IExternalDataSourceFactory; +import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.external.dataflow.TupleForwarder; import org.apache.asterix.external.parser.ADMDataParser; import org.apache.asterix.om.types.ARecordType; @@ -41,7 +41,7 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.std.file.ITupleParser; import org.apache.hyracks.dataflow.std.file.ITupleParserFactory; -public class TestTypedAdapterFactory implements IAdapterFactory { +public class TestTypedAdapterFactory implements ITypedAdapterFactory { private static final long serialVersionUID = 1L; diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java index 0a17b24..45fc33a 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java @@ -44,7 +44,7 @@ public class DatasetDecl extends AbstractStatement { protected final DatasetType datasetType; protected final IDatasetDetailsDecl datasetDetailsDecl; protected final Map<String, String> hints; - private final AdmObjectNode withObjectNode; + private AdmObjectNode withObjectNode; protected final boolean ifNotExists; public DatasetDecl(Identifier dataverse, Identifier name, Identifier itemTypeDataverse, Identifier itemTypeName, @@ -67,7 +67,7 @@ public class DatasetDecl extends AbstractStatement { } this.nodegroupName = nodeGroupName; this.hints = hints; - this.withObjectNode = DatasetDeclParametersUtil.validateAndGetWithObjectNode(withRecord); + this.withObjectNode = DatasetDeclParametersUtil.validateAndGetWithObjectNode(withRecord, datasetType); this.ifNotExists = ifNotExists; this.datasetType = datasetType; this.datasetDetailsDecl = idd; diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java index a26a638..52285d9 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.lang.common.util; +import org.apache.asterix.common.config.DatasetConfig; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.lang.common.expression.RecordConstructor; import org.apache.asterix.object.base.AdmObjectNode; @@ -60,14 +61,21 @@ public class DatasetDeclParametersUtil { private DatasetDeclParametersUtil() { } - public static AdmObjectNode validateAndGetWithObjectNode(RecordConstructor withRecord) throws CompilationException { + public static AdmObjectNode validateAndGetWithObjectNode(RecordConstructor withRecord, + DatasetConfig.DatasetType datasetType) throws CompilationException { if (withRecord == null) { return EMPTY_WITH_OBJECT; } - final ConfigurationTypeValidator validator = new ConfigurationTypeValidator(); - final AdmObjectNode node = ExpressionUtils.toNode(withRecord); - validator.validateType(WITH_OBJECT_TYPE, node); - return node; + + // Handle based on dataset type + if (datasetType == DatasetConfig.DatasetType.INTERNAL) { + final ConfigurationTypeValidator validator = new ConfigurationTypeValidator(); + final AdmObjectNode node = ExpressionUtils.toNode(withRecord); + validator.validateType(WITH_OBJECT_TYPE, node); + return node; + } else { + return ExpressionUtils.toNode(withRecord); + } } private static ARecordType getWithObjectType() { diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java index 49fffe6..3412941 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -35,13 +35,13 @@ import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.MetadataException; +import org.apache.asterix.common.external.IDataSourceAdapter; import org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory; import org.apache.asterix.common.ioopcallbacks.LSMIndexPageWriteCallbackFactory; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.external.adapter.factory.GenericAdapterFactory; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.api.IDataSourceAdapter; +import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.external.dataset.adapter.AdapterIdentifier; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.metadata.IDatasetDetails; @@ -294,7 +294,8 @@ public class MetadataBootstrap { private static DatasourceAdapter getAdapter(String adapterFactoryClassName) throws AlgebricksException { try { - String adapterName = ((IAdapterFactory) (Class.forName(adapterFactoryClassName).newInstance())).getAlias(); + String adapterName = + ((ITypedAdapterFactory) (Class.forName(adapterFactoryClassName).newInstance())).getAlias(); return new DatasourceAdapter(new AdapterIdentifier(MetadataConstants.METADATA_DATAVERSE_NAME, adapterName), adapterFactoryClassName, IDataSourceAdapter.AdapterType.INTERNAL); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java index 62cce05..07bbc57 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java @@ -23,7 +23,7 @@ import java.util.List; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.external.api.IAdapterFactory; +import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.metadata.IDatasetDetails; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.entities.Dataset; @@ -111,7 +111,7 @@ public class DatasetDataSource extends DataSource { externalDataset.getItemTypeDataverseName(), itemTypeName).getDatatype(); ExternalDatasetDetails edd = (ExternalDatasetDetails) externalDataset.getDatasetDetails(); - IAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(externalDataset, + ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(externalDataset, edd.getAdapter(), edd.getProperties(), (ARecordType) itemType, null); return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory); case INTERNAL: diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java index 3460a46..c2983af 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java @@ -26,7 +26,7 @@ import java.util.Map; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.external.api.IAdapterFactory; +import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.om.types.ARecordType; @@ -137,7 +137,7 @@ public class LoadableDataSource extends DataSource { } LoadableDataSource alds = (LoadableDataSource) dataSource; ARecordType itemType = (ARecordType) alds.getLoadedType(); - IAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(alds.getTargetDataset(), + ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(alds.getTargetDataset(), alds.getAdapter(), alds.getAdapterProperties(), itemType, null); RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); return metadataProvider.buildLoadableDatasetScan(jobSpec, adapterFactory, rDesc); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 0a72ceb..5bdf2a7 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -39,6 +39,7 @@ import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.external.IDataSourceAdapter; import org.apache.asterix.common.metadata.LockList; import org.apache.asterix.common.storage.ICompressionManager; import org.apache.asterix.common.transactions.ITxnIdFactory; @@ -48,8 +49,7 @@ import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; import org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil; import org.apache.asterix.external.adapter.factory.LookupAdapterFactory; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.api.IDataSourceAdapter; +import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.indexing.IndexingConstants; @@ -416,7 +416,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan( - JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc) + JobSpecification jobSpec, ITypedAdapterFactory adapterFactory, RecordDescriptor rDesc) throws AlgebricksException { ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory); try { @@ -430,14 +430,14 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); } - public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime( + public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> buildFeedIntakeRuntime( JobSpecification jobSpec, Feed feed, FeedPolicyAccessor policyAccessor) throws Exception { - Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput; + Triple<ITypedAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput; factoryOutput = FeedMetadataUtil.getFeedFactoryAndOutput(feed, policyAccessor, mdTxnCtx, getApplicationContext()); ARecordType recordType = FeedMetadataUtil.getOutputType(feed, feed.getConfiguration().get(ExternalDataConstants.KEY_TYPE_NAME)); - IAdapterFactory adapterFactory = factoryOutput.first; + ITypedAdapterFactory adapterFactory = factoryOutput.first; FeedIntakeOperatorDescriptor feedIngestor = null; switch (factoryOutput.third) { case INTERNAL: @@ -775,11 +775,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> return numElementsHint / numPartitions; } - protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName, + protected ITypedAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName, Map<String, String> configuration, ARecordType itemType, ARecordType metaType) throws AlgebricksException { try { configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName()); - IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory( + ITypedAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory( getApplicationContext().getServiceContext(), adapterName, configuration, itemType, metaType); // check to see if dataset is indexed @@ -922,7 +922,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime( - JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory) throws AlgebricksException { + JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory adapterFactory) throws AlgebricksException { if (itemType.getTypeTag() != ATypeTag.OBJECT) { throw new AlgebricksException("Can only scan datasets of records."); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java index b72c058..c29fb93 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.metadata.entities; -import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType; +import org.apache.asterix.common.external.IDataSourceAdapter.AdapterType; import org.apache.asterix.external.dataset.adapter.AdapterIdentifier; import org.apache.asterix.metadata.MetadataCache; import org.apache.asterix.metadata.api.IMetadataEntity; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java index 8f630cf..9e65c08 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java @@ -24,7 +24,7 @@ import java.io.DataInput; import java.io.DataInputStream; import java.util.Calendar; -import org.apache.asterix.external.api.IDataSourceAdapter; +import org.apache.asterix.common.external.IDataSourceAdapter; import org.apache.asterix.external.dataset.adapter.AdapterIdentifier; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java index 3ae0fec..7ed53e4 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java @@ -28,9 +28,9 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.MetadataException; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.api.IDataSourceAdapter; -import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType; +import org.apache.asterix.common.external.IDataSourceAdapter; +import org.apache.asterix.common.external.IDataSourceAdapter.AdapterType; +import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.external.feed.api.IFeed; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.external.provider.AdapterFactoryProvider; @@ -117,20 +117,20 @@ public class FeedMetadataUtil { adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName); } AdapterType adapterType; - IAdapterFactory adapterFactory; + ITypedAdapterFactory adapterFactory; if (adapterEntity != null) { adapterType = adapterEntity.getType(); String adapterFactoryClassname = adapterEntity.getClassname(); switch (adapterType) { case INTERNAL: - adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance(); + adapterFactory = (ITypedAdapterFactory) Class.forName(adapterFactoryClassname).newInstance(); break; case EXTERNAL: String[] anameComponents = adapterName.split("#"); String libraryName = anameComponents[0]; ClassLoader cl = appCtx.getLibraryManager().getLibraryClassLoader(feed.getDataverseName(), libraryName); - adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance(); + adapterFactory = (ITypedAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance(); break; default: throw new AsterixException("Unknown Adapter type " + adapterType); @@ -165,17 +165,17 @@ public class FeedMetadataUtil { } @SuppressWarnings("rawtypes") - public static Triple<IAdapterFactory, RecordDescriptor, AdapterType> getFeedFactoryAndOutput(Feed feed, + public static Triple<ITypedAdapterFactory, RecordDescriptor, AdapterType> getFeedFactoryAndOutput(Feed feed, FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx, ICcApplicationContext appCtx) throws AlgebricksException { // This method needs to be re-visited String adapterName = null; DatasourceAdapter adapterEntity = null; String adapterFactoryClassname = null; - IAdapterFactory adapterFactory = null; + ITypedAdapterFactory adapterFactory = null; ARecordType adapterOutputType = null; ARecordType metaType = null; - Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> feedProps = null; + Triple<ITypedAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> feedProps = null; IDataSourceAdapter.AdapterType adapterType = null; try { Map<String, String> configuration = feed.getConfiguration(); @@ -196,14 +196,14 @@ public class FeedMetadataUtil { adapterFactoryClassname = adapterEntity.getClassname(); switch (adapterType) { case INTERNAL: - adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance(); + adapterFactory = (ITypedAdapterFactory) Class.forName(adapterFactoryClassname).newInstance(); break; case EXTERNAL: String[] anameComponents = adapterName.split("#"); String libraryName = anameComponents[0]; ClassLoader cl = appCtx.getLibraryManager().getLibraryClassLoader(feed.getDataverseName(), libraryName); - adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance(); + adapterFactory = (ITypedAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance(); break; default: throw new AsterixException("Unknown Adapter type " + adapterType); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java index 47db3b0..c1d8f42 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java @@ -30,7 +30,7 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; import org.apache.asterix.common.config.DatasetConfig.TransactionState; import org.apache.asterix.common.context.IStorageComponentProvider; -import org.apache.asterix.external.api.IAdapterFactory; +import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor; @@ -254,7 +254,7 @@ public class ExternalIndexingOperations { throws HyracksDataException, AlgebricksException { ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); Map<String, String> configuration = externalDatasetDetails.getProperties(); - IAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory( + ITypedAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory( metadataProvider.getApplicationContext().getServiceContext(), externalDatasetDetails.getAdapter(), configuration, (ARecordType) itemType, files, true, null); ExternalScanOperatorDescriptor scanOp = diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java index a0b10c6..3366ac1 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java @@ -45,6 +45,7 @@ import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.config.TransactionProperties; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.external.IAdapterFactoryService; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.replication.INcLifecycleCoordinator; @@ -98,14 +99,15 @@ public class CcApplicationContext implements ICcApplicationContext { private final IReceptionist receptionist; private final IRequestTracker requestTracker; private final IConfigValidator configValidator; + private final IAdapterFactoryService adapterFactoryService; public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc, ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider, IMetadataLockManager mdLockManager, IReceptionistFactory receptionistFactory, - IConfigValidatorFactory configValidatorFactory, Object extensionManager) - throws AlgebricksException, IOException { + IConfigValidatorFactory configValidatorFactory, Object extensionManager, + IAdapterFactoryService adapterFactoryService) throws AlgebricksException, IOException { this.ccServiceCtx = ccServiceCtx; this.hcc = hcc; this.libraryManager = libraryManager; @@ -139,6 +141,7 @@ public class CcApplicationContext implements ICcApplicationContext { receptionist = receptionistFactory.create(); requestTracker = new RequestTracker(this); configValidator = configValidatorFactory.create(); + this.adapterFactoryService = adapterFactoryService; } @Override @@ -306,4 +309,9 @@ public class CcApplicationContext implements ICcApplicationContext { public IRequestTracker getRequestTracker() { return requestTracker; } + + @Override + public IAdapterFactoryService getAdapterFactoryService() { + return adapterFactoryService; + } } diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml index 409b13d..7acfc04 100644 --- a/asterixdb/asterix-server/pom.xml +++ b/asterixdb/asterix-server/pom.xml @@ -172,6 +172,11 @@ <gav>io.netty:netty-all:4.1.46.Final</gav> <noticeUrl>https://raw.githubusercontent.com/netty/netty/netty-4.1.46.Final/NOTICE.txt</noticeUrl> </override> + <override> + <gav>org.reactivestreams:reactive-streams:1.0.2</gav> + <noticeUrl>https://raw.githubusercontent.com/reactive-streams/reactive-streams-jvm/v1.0.2/COPYING.txt</noticeUrl> + <url>https://raw.githubusercontent.com/reactive-streams/reactive-streams-jvm/v1.0.2/LICENSE.txt</url> + </override> </overrides> <licenses> <license> @@ -205,6 +210,7 @@ <aliasUrl>http://www.apache.org/licenses/LICENSE-2.0</aliasUrl> <aliasUrl>https://www.apache.org/licenses/LICENSE-2.0.txt</aliasUrl> <aliasUrl>http://www.apache.org/licenses/LICENSE-2.0.html</aliasUrl> + <aliasUrl>https://aws.amazon.com/apache2.0</aliasUrl> </aliasUrls> <metric>1</metric> </license> diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml index 25550bf..76ec3d2 100644 --- a/asterixdb/pom.xml +++ b/asterixdb/pom.xml @@ -81,6 +81,7 @@ <hyracks.version>0.3.5-SNAPSHOT</hyracks.version> <hadoop.version>2.8.5</hadoop.version> <jacoco.version>0.7.6.201602180812</jacoco.version> + <awsjavasdk.version>2.10.83</awsjavasdk.version> <implementation.title>Apache AsterixDB - ${project.name}</implementation.title> <implementation.url>https://asterixdb.apache.org/</implementation.url> @@ -1340,6 +1341,84 @@ <artifactId>reflections</artifactId> <version>0.9.12</version> </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>s3</artifactId> + <version>${awsjavasdk.version}</version> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-codec-http</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-codec-http2</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-codec</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-transport</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-common</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-buffer</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-transport-native-epoll</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>regions</artifactId> + <version>${awsjavasdk.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>auth</artifactId> + <version>${awsjavasdk.version}</version> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>sdk-core</artifactId> + <version>${awsjavasdk.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <!-- Mock for AWS S3 --> + <dependency> + <groupId>io.findify</groupId> + <artifactId>s3mock_2.12</artifactId> + <version>0.2.5</version> + </dependency> + <!-- Needed for the s3 mock --> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-http-core_2.12</artifactId> + <version>10.1.0</version> + </dependency> </dependencies> </dependencyManagement> diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_COPYING.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_COPYING.txt new file mode 100644 index 0000000..1625c17 --- /dev/null +++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_COPYING.txt @@ -0,0 +1,121 @@ +Creative Commons Legal Code + +CC0 1.0 Universal + + CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE + LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN + ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS + INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES + REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS + PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM + THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED + HEREUNDER. + +Statement of Purpose + +The laws of most jurisdictions throughout the world automatically confer +exclusive Copyright and Related Rights (defined below) upon the creator +and subsequent owner(s) (each and all, an "owner") of an original work of +authorship and/or a database (each, a "Work"). + +Certain owners wish to permanently relinquish those rights to a Work for +the purpose of contributing to a commons of creative, cultural and +scientific works ("Commons") that the public can reliably and without fear +of later claims of infringement build upon, modify, incorporate in other +works, reuse and redistribute as freely as possible in any form whatsoever +and for any purposes, including without limitation commercial purposes. +These owners may contribute to the Commons to promote the ideal of a free +culture and the further production of creative, cultural and scientific +works, or to gain reputation or greater distribution for their Work in +part through the use and efforts of others. + +For these and/or other purposes and motivations, and without any +expectation of additional consideration or compensation, the person +associating CC0 with a Work (the "Affirmer"), to the extent that he or she +is an owner of Copyright and Related Rights in the Work, voluntarily +elects to apply CC0 to the Work and publicly distribute the Work under its +terms, with knowledge of his or her Copyright and Related Rights in the +Work and the meaning and intended legal effect of CC0 on those rights. + +1. Copyright and Related Rights. A Work made available under CC0 may be +protected by copyright and related or neighboring rights ("Copyright and +Related Rights"). Copyright and Related Rights include, but are not +limited to, the following: + + i. the right to reproduce, adapt, distribute, perform, display, + communicate, and translate a Work; + ii. moral rights retained by the original author(s) and/or performer(s); +iii. publicity and privacy rights pertaining to a person's image or + likeness depicted in a Work; + iv. rights protecting against unfair competition in regards to a Work, + subject to the limitations in paragraph 4(a), below; + v. rights protecting the extraction, dissemination, use and reuse of data + in a Work; + vi. database rights (such as those arising under Directive 96/9/EC of the + European Parliament and of the Council of 11 March 1996 on the legal + protection of databases, and under any national implementation + thereof, including any amended or successor version of such + directive); and +vii. other similar, equivalent or corresponding rights throughout the + world based on applicable law or treaty, and any national + implementations thereof. + +2. Waiver. To the greatest extent permitted by, but not in contravention +of, applicable law, Affirmer hereby overtly, fully, permanently, +irrevocably and unconditionally waives, abandons, and surrenders all of +Affirmer's Copyright and Related Rights and associated claims and causes +of action, whether now known or unknown (including existing as well as +future claims and causes of action), in the Work (i) in all territories +worldwide, (ii) for the maximum duration provided by applicable law or +treaty (including future time extensions), (iii) in any current or future +medium and for any number of copies, and (iv) for any purpose whatsoever, +including without limitation commercial, advertising or promotional +purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each +member of the public at large and to the detriment of Affirmer's heirs and +successors, fully intending that such Waiver shall not be subject to +revocation, rescission, cancellation, termination, or any other legal or +equitable action to disrupt the quiet enjoyment of the Work by the public +as contemplated by Affirmer's express Statement of Purpose. + +3. Public License Fallback. Should any part of the Waiver for any reason +be judged legally invalid or ineffective under applicable law, then the +Waiver shall be preserved to the maximum extent permitted taking into +account Affirmer's express Statement of Purpose. In addition, to the +extent the Waiver is so judged Affirmer hereby grants to each affected +person a royalty-free, non transferable, non sublicensable, non exclusive, +irrevocable and unconditional license to exercise Affirmer's Copyright and +Related Rights in the Work (i) in all territories worldwide, (ii) for the +maximum duration provided by applicable law or treaty (including future +time extensions), (iii) in any current or future medium and for any number +of copies, and (iv) for any purpose whatsoever, including without +limitation commercial, advertising or promotional purposes (the +"License"). The License shall be deemed effective as of the date CC0 was +applied by Affirmer to the Work. Should any part of the License for any +reason be judged legally invalid or ineffective under applicable law, such +partial invalidity or ineffectiveness shall not invalidate the remainder +of the License, and in such case Affirmer hereby affirms that he or she +will not (i) exercise any of his or her remaining Copyright and Related +Rights in the Work or (ii) assert any associated claims and causes of +action with respect to the Work, in either case contrary to Affirmer's +express Statement of Purpose. + +4. Limitations and Disclaimers. + + a. No trademark or patent rights held by Affirmer are waived, abandoned, + surrendered, licensed or otherwise affected by this document. + b. Affirmer offers the Work as-is and makes no representations or + warranties of any kind concerning the Work, express, implied, + statutory or otherwise, including without limitation warranties of + title, merchantability, fitness for a particular purpose, non + infringement, or the absence of latent or other defects, accuracy, or + the present or absence of errors, whether or not discoverable, all to + the greatest extent permissible under applicable law. + c. Affirmer disclaims responsibility for clearing rights of other persons + that may apply to the Work or any use thereof, including without + limitation any person's Copyright and Related Rights in the Work. + Further, Affirmer disclaims responsibility for obtaining any necessary + consents, permissions or other rights required for any use of the + Work. + d. Affirmer understands and acknowledges that Creative Commons is not a + party to this document and has no duty or obligation with respect to + this CC0 or use of the Work. \ No newline at end of file diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_LICENSE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_LICENSE.txt new file mode 100644 index 0000000..eadae05 --- /dev/null +++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_LICENSE.txt @@ -0,0 +1,8 @@ +Licensed under Public Domain (CC0) + +To the extent possible under law, the person who associated CC0 with +this code has waived all copyright and related or neighboring +rights to this code. + +You should have received a copy of the CC0 legalcode along with this +work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>. \ No newline at end of file
