This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 11da98844ddb70dad3a04e84716f847f983d1fbc Merge: 3be0c68f52 698e45c105 Author: Ian Maxon <[email protected]> AuthorDate: Thu Oct 2 22:22:24 2025 -0700 Merge branch 'phoenix' Change-Id: I2812c034a6d002bf906a0dff7a26aa7403717e3c asterixdb/asterix-app/pom.xml | 98 +++++++-------------- .../api/common/CloudStorageIntegrationUtil.java | 9 +- .../apache/asterix/api/common/LocalCloudUtil.java | 10 ++- .../api/common/LocalCloudUtilAdobeMock.java | 99 ++++++++++++++-------- .../AtomicMetadataTransactionWithoutWALTest.java | 8 +- .../AtomicStatementsCancellationTest.java | 23 +++-- .../test/cloud_storage/CloudPythonTest.java | 19 ++++- .../test/cloud_storage/CloudStorageAzTest.java | 51 ++++++++--- .../test/cloud_storage/CloudStorageGCSTest.java | 5 +- .../test/cloud_storage/CloudStorageSparseTest.java | 31 +++++-- .../test/cloud_storage/CloudStorageTest.java | 27 ++++-- .../cloud_storage/CloudStorageUnstableTest.java | 10 ++- .../cloud_storage/GCSCloudStorageUnstableTest.java | 6 +- .../apache/asterix/test/common/TestConstants.java | 1 + .../apache/asterix/test/common/TestExecutor.java | 13 ++- .../apache/asterix/test/iceberg/IcebergTest.java | 24 ++++-- .../runtime/SqlppSinglePartitionExecutionTest.java | 10 +++ ...age-sparse.conf => cc-cloud-storage-azblob.ftl} | 15 ++-- ...orage-sparse.conf => cc-cloud-storage-main.ftl} | 6 +- ...parse.conf => cc-cloud-storage-sparse.conf.ftl} | 2 +- ...orage-sparse.conf => cc-cloud-storage.conf.ftl} | 4 +- .../post-non-query/post-non-query.099.post.http} | 10 ++- ... => array_slice_bracket_notation.099.ddl.sqlpp} | 3 +- .../array_slice_bracket_notation.1.ddl.sqlpp | 0 .../array_slice_bracket_notation.2.update.sqlpp | 0 .../array_slice_bracket_notation.3.query.sqlpp | 0 .../array_slice_bracket_notation.4.ddl.sqlpp | 0 .../external-collection/test.010.ddl.sqlpp | 8 +- .../missing-inner-array.001.ddl.sqlpp} | 11 ++- .../missing-inner-array.002.update.sqlpp} | 14 ++- .../missing-inner-array.003.query.sqlpp} | 4 +- .../hash_join_exchange.099.ddl.sqlpp} | 7 +- .../hash_join_exchange_1.099.ddl.sqlpp} | 7 +- .../join-with-empty-dataset.099.ddl.sqlpp} | 5 +- ...uery.sqlpp => limit_after_offset.099.ddl.sqlpp} | 3 +- .../limit_after_offset.1.ddl.sqlpp} | 4 +- .../limit_after_offset.2.query.sqlpp} | 5 +- .../api/post-non-query/post-non-query.099.adm | 1 + .../missing-inner-array/missing-null-array.003.adm | 1 + .../src/test/resources/runtimets/sqlpp_queries.xml | 5 ++ .../runtimets/testsuite_cloud_storage.xml | 2 + .../runtimets/testsuite_single_partition_sqlpp.xml | 5 ++ asterixdb/asterix-cloud/pom.xml | 96 ++++++--------------- .../asterix/cloud/azure/LSMAzBlobStorageTest.java | 84 +++++++++++++++--- .../assembler/ArrayWithUnionValueAssembler.java | 23 +++-- .../assembler/RepeatedPrimitiveValueAssembler.java | 15 ++-- .../values/reader/AbstractColumnValuesReader.java | 5 +- asterixdb/pom.xml | 20 +++-- .../apache/hyracks/api/util/ExceptionUtils.java | 18 ++++ .../cloud/cache/unit/SweepableIndexUnit.java | 2 +- .../apache/hyracks/cloud/sweeper/SweepContext.java | 14 ++- .../hyracks/storage/common/disk/ISweepContext.java | 2 +- .../storage/common/file/FileMapManager.java | 4 +- 53 files changed, 529 insertions(+), 320 deletions(-) diff --cc asterixdb/asterix-app/src/test/java/org/apache/asterix/test/iceberg/IcebergTest.java index 5f1eae4a9b,0000000000..1afc682849 mode 100644,000000..100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/iceberg/IcebergTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/iceberg/IcebergTest.java @@@ -1,323 -1,0 +1,333 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.iceberg; + ++import static org.apache.asterix.api.common.LocalCloudUtilAdobeMock.fillConfigTemplate; ++import static org.apache.asterix.test.cloud_storage.CloudStorageTest.CONFIG_FILE; ++import static org.apache.asterix.test.cloud_storage.CloudStorageTest.CONFIG_FILE_TEMPLATE; ++import static org.apache.asterix.test.cloud_storage.CloudStorageTest.MOCK_SERVER_HOSTNAME_FRAGMENT; +import static org.apache.hyracks.util.file.FileUtil.joinPath; +import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath; +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.List; +import java.util.zip.GZIPOutputStream; + +import org.apache.asterix.api.common.LocalCloudUtilAdobeMock; +import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.external.util.aws.s3.S3Constants; +import org.apache.asterix.test.common.TestConstants; +import org.apache.asterix.test.common.TestExecutor; +import org.apache.asterix.test.runtime.LangExecutionUtil; +import org.apache.asterix.testframework.context.TestCaseContext; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Tables; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.MethodSorters; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + ++import com.adobe.testing.s3mock.testcontainers.S3MockContainer; ++ +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +/** + * Runs an AWS S3 mock server and test for iceberg catalogs and tables + */ +@RunWith(Parameterized.class) +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class IcebergTest { + + private static final Logger LOGGER = LogManager.getLogger(); + + private static final String PATH_BASE = joinPath("data"); + private static final String EXTERNAL_FILTER_DATA_PATH = joinPath(PATH_BASE, "json", "external-filter"); + + private static final String SUITE_TESTS = "testsuite_iceberg.xml"; + private static final String ONLY_TESTS = "testsuite_iceberg_only.xml"; - private static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage.conf"; + static Runnable PREPARE_ICEBERG_TABLE_BUCKET; + + // Client and service endpoint + private static S3Client client; + private static final int MOCK_SERVER_PORT = 8001; - private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT; + private static final String MOCK_SERVER_REGION = "us-west-2"; + + protected TestCaseContext tcCtx; + + public static final String ICEBERG_CONTAINER = "iceberg-improved-container"; + public static final PutObjectRequest.Builder icebergContainerBuilder = + PutObjectRequest.builder().bucket(ICEBERG_CONTAINER); + + public IcebergTest(TestCaseContext tcCtx) { + this.tcCtx = tcCtx; + } + + // iceberg + private static final Schema SCHEMA = + new Schema(required(1, "id", Types.IntegerType.get()), required(2, "data", Types.StringType.get())); + private static final Configuration CONF = new Configuration(); + + private static final String ICEBERG_TABLE_PATH = "s3a://" + ICEBERG_CONTAINER + "/my-table/"; + private static final String ICEBERG_TABLE_PATH_FORMAT_VERSION_2 = + "s3a://" + ICEBERG_CONTAINER + "/my-table-format-version-2/"; + private static final String ICEBERG_TABLE_PATH_MIXED_DATA_FORMAT = + "s3a://" + ICEBERG_CONTAINER + "/my-table-mixed-data-format/"; + + private static final String ICEBERG_TABLE_PATH_EMPTY = "s3a://" + ICEBERG_CONTAINER + "/my-table-empty/"; + + private static final String ICEBERG_TABLE_PATH_MULTIPLE_DATA_FILES = + "s3a://" + ICEBERG_CONTAINER + "/my-table-multiple-data-files/"; + + private static final String ICEBERG_TABLE_PATH_MODIFIED_DATA = + "s3a://" + ICEBERG_CONTAINER + "/my-table-modified-data/"; + ++ private static S3MockContainer s3Mock; ++ + @BeforeClass + public static void setUp() throws Exception { + final TestExecutor testExecutor = new TestExecutor(); - LocalCloudUtilAdobeMock.startS3CloudEnvironment(true); ++ s3Mock = LocalCloudUtilAdobeMock.startS3CloudEnvironment(true); + testExecutor.executorId = "cloud"; + testExecutor.stripSubstring = "//DB:"; - LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor); - System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE_NAME); ++ fillConfigTemplate(MOCK_SERVER_HOSTNAME_FRAGMENT + s3Mock.getHttpServerPort(), CONFIG_FILE_TEMPLATE, ++ CONFIG_FILE); ++ System.setProperty(TestConstants.S3_SERVICE_ENDPOINT_KEY, ++ MOCK_SERVER_HOSTNAME_FRAGMENT + s3Mock.getHttpServerPort()); ++ LangExecutionUtil.setUp(CONFIG_FILE, testExecutor); ++ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE); + + // create the iceberg bucket + S3ClientBuilder builder = S3Client.builder(); - URI endpoint = URI.create(MOCK_SERVER_HOSTNAME); // endpoint pointing to S3 mock server ++ URI endpoint = URI.create(MOCK_SERVER_HOSTNAME_FRAGMENT + s3Mock.getHttpServerPort()); // endpoint pointing to S3 mock server + builder.region(Region.of(MOCK_SERVER_REGION)).credentialsProvider(AnonymousCredentialsProvider.create()) + .endpointOverride(endpoint); + S3Client client = builder.build(); + client.createBucket(CreateBucketRequest.builder().bucket(ICEBERG_CONTAINER).build()); + client.close(); + } + + @AfterClass + public static void tearDown() throws Exception { + LangExecutionUtil.tearDown(); + LocalCloudUtilAdobeMock.shutdownSilently(); + } + + @Parameters(name = "IcebergTest {index}: {0}") + public static Collection<Object[]> tests() throws Exception { + return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS); + } + + @Test + public void test() throws Exception { + LangExecutionUtil.test(tcCtx); + } + + private static DataFile writeFile(String filename, List<Record> records, String location) throws IOException { + Path path = new Path(location, filename); + FileFormat fileFormat = FileFormat.fromFileName(filename); + Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename); + + FileAppender<Record> fileAppender = + new GenericAppenderFactory(IcebergTest.SCHEMA).newAppender(fromPath(path, CONF), fileFormat); + try (FileAppender<Record> appender = fileAppender) { + appender.addAll(records); + } + + return DataFiles.builder(PartitionSpec.unpartitioned()).withInputFile(HadoopInputFile.fromPath(path, CONF)) + .withMetrics(fileAppender.metrics()).build(); + } + + private static void prepareIcebergConfiguration() { - CONF.set(S3Constants.HADOOP_SERVICE_END_POINT, MOCK_SERVER_HOSTNAME); ++ CONF.set(S3Constants.HADOOP_SERVICE_END_POINT, s3Mock.getHttpEndpoint()); + // switch to http + CONF.set("fs.s3a.connection.ssl.enabled", "false"); + // forces URL style access which is required by the mock. Overwrites DNS based bucket access scheme. + CONF.set("fs.s3a.path.style.access", "true"); + // Mock server doesn't support concurrency control + CONF.set("fs.s3a.change.detection.version.required", "false"); + CONF.set(S3Constants.HADOOP_ACCESS_KEY_ID, TestConstants.S3_ACCESS_KEY_ID_DEFAULT); + CONF.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, TestConstants.S3_SECRET_ACCESS_KEY_DEFAULT); + } + + public static void prepareIcebergTableContainer() { + prepareIcebergConfiguration(); + Tables tables = new HadoopTables(CONF); + + // test data + Record genericRecord = GenericRecord.create(SCHEMA); + + List<Record> fileFirstSnapshotRecords = + ImmutableList.of(genericRecord.copy(ImmutableMap.of("id", 0, "data", "vibrant_mclean")), + genericRecord.copy(ImmutableMap.of("id", 1, "data", "frosty_wilson")), + genericRecord.copy(ImmutableMap.of("id", 2, "data", "serene_kirby"))); + + List<Record> fileSecondSnapshotRecords = + ImmutableList.of(genericRecord.copy(ImmutableMap.of("id", 3, "data", "peaceful_pare")), + genericRecord.copy(ImmutableMap.of("id", 4, "data", "laughing_mahavira")), + genericRecord.copy(ImmutableMap.of("id", 5, "data", "vibrant_lamport"))); + + // create the table + Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()), ICEBERG_TABLE_PATH); + + // load test data + try { + DataFile file = writeFile(FileFormat.PARQUET.addExtension("file"), fileFirstSnapshotRecords, + IcebergTest.ICEBERG_TABLE_PATH); + table.newAppend().appendFile(file).commit(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // create a table with unsupported iceberg version + Table unsupportedTable = tables.create(SCHEMA, + PartitionSpec.unpartitioned(), ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, + FileFormat.PARQUET.name(), TableProperties.FORMAT_VERSION, "2"), + ICEBERG_TABLE_PATH_FORMAT_VERSION_2); + + // load test data + try { + DataFile file = writeFile(FileFormat.PARQUET.addExtension("file"), fileFirstSnapshotRecords, + IcebergTest.ICEBERG_TABLE_PATH_FORMAT_VERSION_2); + unsupportedTable.newAppend().appendFile(file).commit(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // create a table with mix of parquet and avro data files + Table mixedDataFormats = tables.create(SCHEMA, PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()), + ICEBERG_TABLE_PATH_MIXED_DATA_FORMAT); + + // load test data + try { + DataFile parquetFile = writeFile(FileFormat.PARQUET.addExtension("parquet-file"), fileFirstSnapshotRecords, + IcebergTest.ICEBERG_TABLE_PATH_MIXED_DATA_FORMAT); + DataFile avroFile = writeFile(FileFormat.AVRO.addExtension("avro-file"), fileSecondSnapshotRecords, + IcebergTest.ICEBERG_TABLE_PATH_MIXED_DATA_FORMAT); + + mixedDataFormats.newAppend().appendFile(parquetFile).appendFile(avroFile).commit(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // empty table + tables.create(SCHEMA, PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()), + ICEBERG_TABLE_PATH_EMPTY); + + // multiple data files + + Table multipleDataFiles = tables.create(SCHEMA, PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()), + ICEBERG_TABLE_PATH_MULTIPLE_DATA_FILES); + + // load test data + try { + DataFile file1 = writeFile(FileFormat.PARQUET.addExtension("file-1"), fileFirstSnapshotRecords, + IcebergTest.ICEBERG_TABLE_PATH_MULTIPLE_DATA_FILES); + DataFile file2 = writeFile(FileFormat.PARQUET.addExtension("file-2"), fileSecondSnapshotRecords, + IcebergTest.ICEBERG_TABLE_PATH_MULTIPLE_DATA_FILES); + + multipleDataFiles.newAppend().appendFile(file1).appendFile(file2).commit(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // modify data + Table modifiedData = tables.create(SCHEMA, PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()), + ICEBERG_TABLE_PATH_MODIFIED_DATA); + + // load test data + try { + DataFile file1 = writeFile(FileFormat.PARQUET.addExtension("file-1"), fileFirstSnapshotRecords, + IcebergTest.ICEBERG_TABLE_PATH_MODIFIED_DATA); + DataFile file2 = writeFile(FileFormat.PARQUET.addExtension("file-2"), fileSecondSnapshotRecords, + IcebergTest.ICEBERG_TABLE_PATH_MODIFIED_DATA); + + modifiedData.newAppend().appendFile(file1).appendFile(file2).commit(); + modifiedData.newDelete().deleteFile(file1).commit(); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static RequestBody getRequestBody(String content, boolean fromFile, boolean gzipped) { + RequestBody body; + // Content is string + if (!fromFile) { + body = RequestBody.fromString(content); + } else { + // Content is a file path + if (!gzipped) { + body = RequestBody.fromFile(Paths.get(content)); + } else { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) { + gzipOutputStream.write(Files.readAllBytes(Paths.get(content))); + gzipOutputStream.close(); // Need to close or data will be invalid + byte[] gzipBytes = byteArrayOutputStream.toByteArray(); + body = RequestBody.fromBytes(gzipBytes); + } catch (IOException ex) { + throw new IllegalArgumentException(ex.toString()); + } + } + } + + return body; + } +}
