vinothchandar commented on code in PR #17951: URL: https://github.com/apache/hudi/pull/17951#discussion_r2852857934
########## hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/ADLSStorageLockClient.java: ########## @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.apache.hudi.client.transaction.lock.StorageLockClient; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.util.Functions; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.config.StorageBasedLockConfig; + +import com.azure.core.exception.HttpResponseException; +import com.azure.core.http.policy.ExponentialBackoffOptions; +import com.azure.core.http.policy.RetryOptions; Review Comment: is this class just used by tests? <img width="606" height="283" alt="Image" src="https://github.com/user-attachments/assets/8be77b55-3157-42d7-bc50-773ca52d9919" /> ########## hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java: ########## @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.apache.hudi.client.transaction.lock.StorageLockClient; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.util.Functions; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.config.AzureStorageLockConfig; +import org.apache.hudi.config.StorageBasedLockConfig; +import org.apache.hudi.common.config.TypedProperties; + +import com.azure.core.credential.AzureSasCredential; +import com.azure.core.exception.HttpResponseException; +import com.azure.core.http.policy.ExponentialBackoffOptions; +import com.azure.core.http.policy.RetryOptions; +import com.azure.core.http.rest.Response; +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.HttpClientOptions; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.BlobUrlParts; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobRequestConditions; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.BlockBlobItem; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import lombok.AllArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; + +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; + +/** + * Azure Storage implementation of {@link StorageLockClient} using Azure Blob conditional requests. + * + * <p>Supports the following URI schemes: + * <ul> + * <li>ADLS Gen2: {@code abfs://} and {@code abfss://}</li> + * <li>Azure Blob Storage: {@code wasb://} and {@code wasbs://}</li> + * </ul> + * + * + * <ul> + * <li>Create: conditional write with If-None-Match: *</li> + * <li>Update/Renew/Expire: conditional write with If-Match: <etag></li> + * </ul> + * + * <p>Expected lock URI formats: + * <ul> + * <li>{@code abfs://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code abfss://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code wasb://<container>@<account>.blob.core.windows.net/<path>}</li> + * <li>{@code wasbs://<container>@<account>.blob.core.windows.net/<path>}</li> + * </ul> + * + * <p>Authentication precedence (via {@link Properties}): + * <ul> + * <li>{@link AzureStorageLockConfig#AZURE_CONNECTION_STRING}</li> + * <li>{@link AzureStorageLockConfig#AZURE_SAS_TOKEN}</li> + * <li>DefaultAzureCredential</li> + * </ul> + */ +@ThreadSafe +public class AzureStorageLockClient implements StorageLockClient { + + private static final int PRECONDITION_FAILURE_ERROR_CODE = 412; + private static final int NOT_FOUND_ERROR_CODE = 404; + private static final int CONFLICT_ERROR_CODE = 409; + private static final int RATE_LIMIT_ERROR_CODE = 429; + private static final int INTERNAL_SERVER_ERROR_CODE_MIN = 500; + + public static final String AZURE_CONNECTION_STRING = AzureStorageLockConfig.AZURE_CONNECTION_STRING.key(); + public static final String AZURE_SAS_TOKEN = AzureStorageLockConfig.AZURE_SAS_TOKEN.key(); + + private final Logger logger; + private final BlobServiceClient blobServiceClient; + private final Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier; + private final ConcurrentMap<String, BlobServiceClient> secondaryBlobServiceClients; + private final BlobClient lockBlobClient; + private final Properties clientProperties; + private final String ownerId; + private final String lockFileUri; + private final String lockBlobEndpoint; + + /** + * Constructor used by reflection by {@link org.apache.hudi.client.transaction.lock.StorageBasedLockProvider}. + * + * @param ownerId lock owner id + * @param lockFileUri lock file URI (abfs/abfss/wasb/wasbs) + * @param props properties used to customize/authenticate the Azure client + */ + public AzureStorageLockClient(String ownerId, String lockFileUri, Properties props) { + this(ownerId, lockFileUri, props, createDefaultBlobServiceClient(), LoggerFactory.getLogger(AzureStorageLockClient.class)); + } + + @VisibleForTesting + AzureStorageLockClient( + String ownerId, + String lockFileUri, + Properties props, + Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier, + Logger logger) { + this.ownerId = ownerId; + this.lockFileUri = lockFileUri; + this.logger = logger; + this.clientProperties = props; + this.blobServiceClientSupplier = blobServiceClientSupplier; + this.secondaryBlobServiceClients = new ConcurrentHashMap<>(); + + AzureLocation location = parseAzureLocation(lockFileUri).withProperties(props); + this.lockBlobEndpoint = location.blobEndpoint; + this.blobServiceClient = blobServiceClientSupplier.apply(location); + BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(location.container); + this.lockBlobClient = containerClient.getBlobClient(location.blobPath); + } + + private static Functions.Function1<AzureLocation, BlobServiceClient> createDefaultBlobServiceClient() { + return (location) -> { + Properties props = location.props; + BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); + configureAzureClientOptions(builder, props); + + String connectionString = props == null ? null : props.getProperty(AZURE_CONNECTION_STRING); + if (connectionString != null && !connectionString.trim().isEmpty()) { + return builder.connectionString(connectionString).buildClient(); + } + + builder.endpoint(location.blobEndpoint); + String sasToken = props == null ? null : props.getProperty(AZURE_SAS_TOKEN); + if (sasToken != null && !sasToken.trim().isEmpty()) { + String cleaned = sasToken.startsWith("?") ? sasToken.substring(1) : sasToken; + return builder.credential(new AzureSasCredential(cleaned)).buildClient(); + } + + return builder.credential(new DefaultAzureCredentialBuilder().build()).buildClient(); + }; + } + + private static void configureAzureClientOptions(BlobServiceClientBuilder builder, Properties props) { + // Set Azure SDK timeouts based on lock validity to avoid long-hanging calls. + TypedProperties typedProps = new TypedProperties(); + if (props != null) { + typedProps.putAll(props); + } + long validityTimeoutSecs; + try { + validityTimeoutSecs = getLongWithAltKeys(typedProps, StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS); + } catch (NumberFormatException e) { + validityTimeoutSecs = StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.defaultValue(); + } + long azureCallTimeoutSecs = Math.max(1, validityTimeoutSecs / 5); + + // Disable automatic SDK retries; Hudi manages retries at the lock-provider level. + ExponentialBackoffOptions exponentialOptions = new ExponentialBackoffOptions().setMaxRetries(0); + RetryOptions retryOptions = new RetryOptions(exponentialOptions); + + HttpClientOptions clientOptions = new HttpClientOptions() + .setResponseTimeout(Duration.ofSeconds(azureCallTimeoutSecs)) + .setReadTimeout(Duration.ofSeconds(azureCallTimeoutSecs)); + + builder.retryOptions(retryOptions).clientOptions(clientOptions); + } + + @Override + public Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile( + StorageLockData newLockData, + Option<StorageLockFile> previousLockFile) { + String expectedEtag = previousLockFile.isPresent() ? previousLockFile.get().getVersionId() : null; + try { + StorageLockFile updated = createOrUpdateLockFileInternal(newLockData, expectedEtag); + return Pair.of(LockUpsertResult.SUCCESS, Option.of(updated)); + } catch (BlobStorageException e) { + return Pair.of(handleUpsertBlobStorageException(e), Option.empty()); + } catch (HttpResponseException e) { + logger.error("OwnerId: {}, Unexpected Azure SDK error while writing lock file: {}", + ownerId, lockFileUri, e); + if (!previousLockFile.isPresent()) { + // For create, fail fast since this indicates a larger issue. + throw e; + } + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } catch (Exception e) { + logger.error("OwnerId: {}, Unexpected error while writing lock file: {}", ownerId, lockFileUri, e); + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } + } + + @Override + public Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile() { + try { + Response<BinaryData> response = lockBlobClient.downloadContentWithResponse(null, null, null, Context.NONE); + String eTag = response.getHeaders() != null ? response.getHeaders().getValue("ETag") : null; Review Comment: I am a little nervous with the `eTag` being `null`. Won't we pass the null back as precondition check next time and it ll always succeed. Should we error out instead? ########## hudi-azure/src/main/java/org/apache/hudi/config/AzureStorageLockConfig.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.LockConfiguration; + +/** + * Hoodie Configs for Azure based storage locks. + */ +@ConfigClassProperty(name = "Azure based Locks Configurations", + groupName = ConfigGroups.Names.WRITE_CLIENT, + subGroupName = ConfigGroups.SubGroupNames.LOCK, + description = "Configs that control Azure Blob/ADLS based locking mechanisms " + + "required for concurrency control between writers to a Hudi table.") +public class AzureStorageLockConfig extends HoodieConfig { + + private static final String AZURE_BASED_LOCK_PROPERTY_PREFIX = LockConfiguration.LOCK_PREFIX + "azure."; + + public static final ConfigProperty<String> AZURE_CONNECTION_STRING = ConfigProperty Review Comment: same question as above - on whether this is typical azure production setup. per some cursory research, it seems azure calls out some risks/precautions with connection strings? https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string ########## hudi-azure/src/main/java/org/apache/hudi/config/AzureStorageLockConfig.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.LockConfiguration; + +/** + * Hoodie Configs for Azure based storage locks. + */ +@ConfigClassProperty(name = "Azure based Locks Configurations", + groupName = ConfigGroups.Names.WRITE_CLIENT, + subGroupName = ConfigGroups.SubGroupNames.LOCK, + description = "Configs that control Azure Blob/ADLS based locking mechanisms " + + "required for concurrency control between writers to a Hudi table.") +public class AzureStorageLockConfig extends HoodieConfig { + + private static final String AZURE_BASED_LOCK_PROPERTY_PREFIX = LockConfiguration.LOCK_PREFIX + "azure."; + + public static final ConfigProperty<String> AZURE_CONNECTION_STRING = ConfigProperty Review Comment: if you intend this for dev/testing, lets update docs to specify so. ########## hudi-azure/src/main/java/org/apache/hudi/config/AzureStorageLockConfig.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.LockConfiguration; + +/** + * Hoodie Configs for Azure based storage locks. + */ +@ConfigClassProperty(name = "Azure based Locks Configurations", + groupName = ConfigGroups.Names.WRITE_CLIENT, + subGroupName = ConfigGroups.SubGroupNames.LOCK, + description = "Configs that control Azure Blob/ADLS based locking mechanisms " + + "required for concurrency control between writers to a Hudi table.") +public class AzureStorageLockConfig extends HoodieConfig { + + private static final String AZURE_BASED_LOCK_PROPERTY_PREFIX = LockConfiguration.LOCK_PREFIX + "azure."; + + public static final ConfigProperty<String> AZURE_CONNECTION_STRING = ConfigProperty + .key(AZURE_BASED_LOCK_PROPERTY_PREFIX + "connection.string") + .noDefaultValue() + .markAdvanced() + .withDocumentation("For Azure based lock provider, optional Azure Storage connection string used " + + "for authenticating BlobServiceClient."); + + public static final ConfigProperty<String> AZURE_SAS_TOKEN = ConfigProperty + .key(AZURE_BASED_LOCK_PROPERTY_PREFIX + "sas.token") + .noDefaultValue() + .markAdvanced() + .withDocumentation("For Azure based lock provider, optional SAS token used for " Review Comment: nit: document the minimum permissions required on the token. Read, Write, Create? ########## pom.xml: ########## @@ -112,7 +114,7 @@ <confluent.version>5.5.0</confluent.version> <glassfish.version>2.17</glassfish.version> <glassfish.el.version>3.0.1-b12</glassfish.el.version> - <parquet.version>1.10.1</parquet.version> + <parquet.version>1.13.1</parquet.version> Review Comment: is this change and the version bump below related to the locking somehow? if not, can we revert.. and decouple from this PR. ########## hudi-azure/pom.xml: ########## @@ -0,0 +1,240 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>hudi</artifactId> + <groupId>org.apache.hudi</groupId> + <version>1.2.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>hudi-azure</artifactId> + <packaging>jar</packaging> + + <properties> + <!-- Azure SDK versions are intentionally pinned here to keep this module self-contained. --> + <azure.storage.blob.version>12.26.0</azure.storage.blob.version> + <azure.identity.version>1.12.2</azure.identity.version> + <!-- Keep Jackson consistent for Azure SDK integration tests (Hudi core pins older Jackson versions). --> + <azure.jackson.version>2.13.5</azure.jackson.version> + </properties> + + <dependencies> + <!-- Logging --> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-1.2-api</artifactId> + </dependency> + + <!-- Lombok --> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + </dependency> + + <!-- Hoodie --> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-client-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-common</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Hadoop (needed by shared test utilities / configs, consistent with hudi-gcp) --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + </dependency> + + <!-- Azure SDK (Blob + DefaultAzureCredential) --> + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-storage-blob</artifactId> + <version>${azure.storage.blob.version}</version> + </dependency> + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-identity</artifactId> + <version>${azure.identity.version}</version> + </dependency> + + <!-- Test --> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-tests-common</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-client-common</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-hadoop-common</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-common</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <!-- Azure SDK requires a consistent Jackson stack for XML support; override for tests --> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${azure.jackson.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${azure.jackson.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${azure.jackson.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.datatype</groupId> + <artifactId>jackson-datatype-jsr310</artifactId> + <version>${azure.jackson.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-xml</artifactId> + <version>${azure.jackson.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>junit-jupiter</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo-shaded</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>azure-integration-tests</id> Review Comment: I approved some extra workflows, that showed up. but +1 lets verify ########## hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java: ########## @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.apache.hudi.client.transaction.lock.StorageLockClient; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.util.Functions; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.config.AzureStorageLockConfig; +import org.apache.hudi.config.StorageBasedLockConfig; +import org.apache.hudi.common.config.TypedProperties; + +import com.azure.core.credential.AzureSasCredential; +import com.azure.core.exception.HttpResponseException; +import com.azure.core.http.policy.ExponentialBackoffOptions; +import com.azure.core.http.policy.RetryOptions; +import com.azure.core.http.rest.Response; +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.HttpClientOptions; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.BlobUrlParts; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobRequestConditions; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.BlockBlobItem; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import lombok.AllArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; + +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; + +/** + * Azure Storage implementation of {@link StorageLockClient} using Azure Blob conditional requests. + * + * <p>Supports the following URI schemes: + * <ul> + * <li>ADLS Gen2: {@code abfs://} and {@code abfss://}</li> + * <li>Azure Blob Storage: {@code wasb://} and {@code wasbs://}</li> + * </ul> + * + * + * <ul> + * <li>Create: conditional write with If-None-Match: *</li> + * <li>Update/Renew/Expire: conditional write with If-Match: <etag></li> + * </ul> + * + * <p>Expected lock URI formats: + * <ul> + * <li>{@code abfs://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code abfss://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code wasb://<container>@<account>.blob.core.windows.net/<path>}</li> + * <li>{@code wasbs://<container>@<account>.blob.core.windows.net/<path>}</li> + * </ul> + * + * <p>Authentication precedence (via {@link Properties}): + * <ul> + * <li>{@link AzureStorageLockConfig#AZURE_CONNECTION_STRING}</li> + * <li>{@link AzureStorageLockConfig#AZURE_SAS_TOKEN}</li> + * <li>DefaultAzureCredential</li> + * </ul> + */ +@ThreadSafe +public class AzureStorageLockClient implements StorageLockClient { + + private static final int PRECONDITION_FAILURE_ERROR_CODE = 412; + private static final int NOT_FOUND_ERROR_CODE = 404; + private static final int CONFLICT_ERROR_CODE = 409; + private static final int RATE_LIMIT_ERROR_CODE = 429; + private static final int INTERNAL_SERVER_ERROR_CODE_MIN = 500; + + public static final String AZURE_CONNECTION_STRING = AzureStorageLockConfig.AZURE_CONNECTION_STRING.key(); + public static final String AZURE_SAS_TOKEN = AzureStorageLockConfig.AZURE_SAS_TOKEN.key(); + + private final Logger logger; + private final BlobServiceClient blobServiceClient; + private final Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier; + private final ConcurrentMap<String, BlobServiceClient> secondaryBlobServiceClients; + private final BlobClient lockBlobClient; + private final Properties clientProperties; + private final String ownerId; + private final String lockFileUri; + private final String lockBlobEndpoint; + + /** + * Constructor used by reflection by {@link org.apache.hudi.client.transaction.lock.StorageBasedLockProvider}. + * + * @param ownerId lock owner id + * @param lockFileUri lock file URI (abfs/abfss/wasb/wasbs) + * @param props properties used to customize/authenticate the Azure client + */ + public AzureStorageLockClient(String ownerId, String lockFileUri, Properties props) { + this(ownerId, lockFileUri, props, createDefaultBlobServiceClient(), LoggerFactory.getLogger(AzureStorageLockClient.class)); + } + + @VisibleForTesting + AzureStorageLockClient( + String ownerId, + String lockFileUri, + Properties props, + Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier, + Logger logger) { + this.ownerId = ownerId; + this.lockFileUri = lockFileUri; + this.logger = logger; + this.clientProperties = props; + this.blobServiceClientSupplier = blobServiceClientSupplier; + this.secondaryBlobServiceClients = new ConcurrentHashMap<>(); + + AzureLocation location = parseAzureLocation(lockFileUri).withProperties(props); + this.lockBlobEndpoint = location.blobEndpoint; + this.blobServiceClient = blobServiceClientSupplier.apply(location); + BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(location.container); + this.lockBlobClient = containerClient.getBlobClient(location.blobPath); + } + + private static Functions.Function1<AzureLocation, BlobServiceClient> createDefaultBlobServiceClient() { + return (location) -> { + Properties props = location.props; + BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); + configureAzureClientOptions(builder, props); + + String connectionString = props == null ? null : props.getProperty(AZURE_CONNECTION_STRING); + if (connectionString != null && !connectionString.trim().isEmpty()) { + return builder.connectionString(connectionString).buildClient(); + } + + builder.endpoint(location.blobEndpoint); + String sasToken = props == null ? null : props.getProperty(AZURE_SAS_TOKEN); + if (sasToken != null && !sasToken.trim().isEmpty()) { + String cleaned = sasToken.startsWith("?") ? sasToken.substring(1) : sasToken; + return builder.credential(new AzureSasCredential(cleaned)).buildClient(); + } + + return builder.credential(new DefaultAzureCredentialBuilder().build()).buildClient(); Review Comment: Should we use different credential builder here for typical azure production setups? (not very fam myself there). Asking since IIRC `HoodieAWSConfig` supports role ARN, access key, secret key, session token etc.. ########## packaging/hudi-azure-bundle/pom.xml: ########## @@ -0,0 +1,179 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>hudi</artifactId> + <groupId>org.apache.hudi</groupId> + <version>1.2.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>hudi-azure-bundle</artifactId> + <packaging>jar</packaging> + + <properties> + <checkstyle.skip>true</checkstyle.skip> + <main.basedir>${project.parent.basedir}</main.basedir> + <skipTests>true</skipTests> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${maven-shade-plugin.version}</version> + <executions> + <execution> + <phase>package</phase> Review Comment: +1 it would be great if you can add status on that ########## hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java: ########## @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.azure.transaction.lock; + +import org.apache.hudi.client.transaction.lock.StorageLockClient; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.util.Functions; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.config.AzureStorageLockConfig; +import org.apache.hudi.config.StorageBasedLockConfig; +import org.apache.hudi.common.config.TypedProperties; + +import com.azure.core.credential.AzureSasCredential; +import com.azure.core.exception.HttpResponseException; +import com.azure.core.http.policy.ExponentialBackoffOptions; +import com.azure.core.http.policy.RetryOptions; +import com.azure.core.http.rest.Response; +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.HttpClientOptions; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.BlobUrlParts; +import com.azure.storage.blob.models.BlobErrorCode; +import com.azure.storage.blob.models.BlobRequestConditions; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.BlockBlobItem; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import lombok.AllArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; + +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys; + +/** + * Azure Storage implementation of {@link StorageLockClient} using Azure Blob conditional requests. + * + * <p>Supports the following URI schemes: + * <ul> + * <li>ADLS Gen2: {@code abfs://} and {@code abfss://}</li> + * <li>Azure Blob Storage: {@code wasb://} and {@code wasbs://}</li> + * </ul> + * + * + * <ul> + * <li>Create: conditional write with If-None-Match: *</li> + * <li>Update/Renew/Expire: conditional write with If-Match: <etag></li> + * </ul> + * + * <p>Expected lock URI formats: + * <ul> + * <li>{@code abfs://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code abfss://<container>@<account>.dfs.core.windows.net/<path>}</li> + * <li>{@code wasb://<container>@<account>.blob.core.windows.net/<path>}</li> + * <li>{@code wasbs://<container>@<account>.blob.core.windows.net/<path>}</li> + * </ul> + * + * <p>Authentication precedence (via {@link Properties}): + * <ul> + * <li>{@link AzureStorageLockConfig#AZURE_CONNECTION_STRING}</li> + * <li>{@link AzureStorageLockConfig#AZURE_SAS_TOKEN}</li> + * <li>DefaultAzureCredential</li> + * </ul> + */ +@ThreadSafe +public class AzureStorageLockClient implements StorageLockClient { + + private static final int PRECONDITION_FAILURE_ERROR_CODE = 412; + private static final int NOT_FOUND_ERROR_CODE = 404; + private static final int CONFLICT_ERROR_CODE = 409; + private static final int RATE_LIMIT_ERROR_CODE = 429; + private static final int INTERNAL_SERVER_ERROR_CODE_MIN = 500; + + public static final String AZURE_CONNECTION_STRING = AzureStorageLockConfig.AZURE_CONNECTION_STRING.key(); + public static final String AZURE_SAS_TOKEN = AzureStorageLockConfig.AZURE_SAS_TOKEN.key(); + + private final Logger logger; + private final BlobServiceClient blobServiceClient; + private final Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier; + private final ConcurrentMap<String, BlobServiceClient> secondaryBlobServiceClients; + private final BlobClient lockBlobClient; + private final Properties clientProperties; + private final String ownerId; + private final String lockFileUri; + private final String lockBlobEndpoint; + + /** + * Constructor used by reflection by {@link org.apache.hudi.client.transaction.lock.StorageBasedLockProvider}. + * + * @param ownerId lock owner id + * @param lockFileUri lock file URI (abfs/abfss/wasb/wasbs) + * @param props properties used to customize/authenticate the Azure client + */ + public AzureStorageLockClient(String ownerId, String lockFileUri, Properties props) { + this(ownerId, lockFileUri, props, createDefaultBlobServiceClient(), LoggerFactory.getLogger(AzureStorageLockClient.class)); + } + + @VisibleForTesting + AzureStorageLockClient( + String ownerId, + String lockFileUri, + Properties props, + Functions.Function1<AzureLocation, BlobServiceClient> blobServiceClientSupplier, + Logger logger) { + this.ownerId = ownerId; + this.lockFileUri = lockFileUri; + this.logger = logger; + this.clientProperties = props; + this.blobServiceClientSupplier = blobServiceClientSupplier; + this.secondaryBlobServiceClients = new ConcurrentHashMap<>(); + + AzureLocation location = parseAzureLocation(lockFileUri).withProperties(props); + this.lockBlobEndpoint = location.blobEndpoint; + this.blobServiceClient = blobServiceClientSupplier.apply(location); + BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(location.container); + this.lockBlobClient = containerClient.getBlobClient(location.blobPath); + } + + private static Functions.Function1<AzureLocation, BlobServiceClient> createDefaultBlobServiceClient() { + return (location) -> { + Properties props = location.props; + BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); + configureAzureClientOptions(builder, props); + + String connectionString = props == null ? null : props.getProperty(AZURE_CONNECTION_STRING); + if (connectionString != null && !connectionString.trim().isEmpty()) { + return builder.connectionString(connectionString).buildClient(); + } + + builder.endpoint(location.blobEndpoint); + String sasToken = props == null ? null : props.getProperty(AZURE_SAS_TOKEN); + if (sasToken != null && !sasToken.trim().isEmpty()) { + String cleaned = sasToken.startsWith("?") ? sasToken.substring(1) : sasToken; + return builder.credential(new AzureSasCredential(cleaned)).buildClient(); + } + + return builder.credential(new DefaultAzureCredentialBuilder().build()).buildClient(); + }; + } + + private static void configureAzureClientOptions(BlobServiceClientBuilder builder, Properties props) { + // Set Azure SDK timeouts based on lock validity to avoid long-hanging calls. + TypedProperties typedProps = new TypedProperties(); + if (props != null) { + typedProps.putAll(props); + } + long validityTimeoutSecs; + try { + validityTimeoutSecs = getLongWithAltKeys(typedProps, StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS); + } catch (NumberFormatException e) { + validityTimeoutSecs = StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.defaultValue(); + } + long azureCallTimeoutSecs = Math.max(1, validityTimeoutSecs / 5); + + // Disable automatic SDK retries; Hudi manages retries at the lock-provider level. + ExponentialBackoffOptions exponentialOptions = new ExponentialBackoffOptions().setMaxRetries(0); + RetryOptions retryOptions = new RetryOptions(exponentialOptions); + + HttpClientOptions clientOptions = new HttpClientOptions() + .setResponseTimeout(Duration.ofSeconds(azureCallTimeoutSecs)) + .setReadTimeout(Duration.ofSeconds(azureCallTimeoutSecs)); + + builder.retryOptions(retryOptions).clientOptions(clientOptions); + } + + @Override + public Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile( + StorageLockData newLockData, + Option<StorageLockFile> previousLockFile) { + String expectedEtag = previousLockFile.isPresent() ? previousLockFile.get().getVersionId() : null; + try { + StorageLockFile updated = createOrUpdateLockFileInternal(newLockData, expectedEtag); + return Pair.of(LockUpsertResult.SUCCESS, Option.of(updated)); + } catch (BlobStorageException e) { + return Pair.of(handleUpsertBlobStorageException(e), Option.empty()); + } catch (HttpResponseException e) { + logger.error("OwnerId: {}, Unexpected Azure SDK error while writing lock file: {}", + ownerId, lockFileUri, e); + if (!previousLockFile.isPresent()) { + // For create, fail fast since this indicates a larger issue. + throw e; + } + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } catch (Exception e) { + logger.error("OwnerId: {}, Unexpected error while writing lock file: {}", ownerId, lockFileUri, e); + return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty()); + } + } + + @Override + public Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile() { + try { + Response<BinaryData> response = lockBlobClient.downloadContentWithResponse(null, null, null, Context.NONE); + String eTag = response.getHeaders() != null ? response.getHeaders().getValue("ETag") : null; + StorageLockFile lockFile = StorageLockFile.createFromStream(response.getValue().toStream(), eTag); + return Pair.of(LockGetResult.SUCCESS, Option.of(lockFile)); + } catch (BlobStorageException e) { + return Pair.of(handleGetStorageException(e), Option.empty()); + } + } + + private LockGetResult handleGetStorageException(BlobStorageException e) { + int code = e.getStatusCode(); + if (code == NOT_FOUND_ERROR_CODE || e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) { + logger.info("OwnerId: {}, Object not found in the path: {}", ownerId, lockFileUri); + return LockGetResult.NOT_EXISTS; + } else if (code == RATE_LIMIT_ERROR_CODE) { + logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", ownerId, lockFileUri); + } else if (code >= INTERNAL_SERVER_ERROR_CODE_MIN) { + logger.warn("OwnerId: {}, Azure returned internal server error code for lock file: {}", ownerId, lockFileUri, e); + } else { + throw e; + } + return LockGetResult.UNKNOWN_ERROR; + } + + private StorageLockFile createOrUpdateLockFileInternal(StorageLockData lockData, String expectedEtag) { + byte[] bytes = StorageLockFile.toByteArray(lockData); + BlobRequestConditions conditions = new BlobRequestConditions(); + if (expectedEtag == null) { + conditions.setIfNoneMatch("*"); + } else { + conditions.setIfMatch(expectedEtag); + } + + BlobParallelUploadOptions options = new BlobParallelUploadOptions(BinaryData.fromBytes(bytes)) + .setRequestConditions(conditions); + Response<BlockBlobItem> response = lockBlobClient.uploadWithResponse(options, null, Context.NONE); + String newEtag = response.getHeaders() != null ? response.getHeaders().getValue("ETag") : null; + if (newEtag == null && response.getValue() != null) { + newEtag = response.getValue().getETag(); + } + if (newEtag == null || newEtag.isEmpty()) { + throw new HoodieLockException("Missing ETag in Azure upload response for lock file: " + lockFileUri); + } + return new StorageLockFile(lockData, newEtag); + } + + private LockUpsertResult handleUpsertBlobStorageException(BlobStorageException e) { + int code = e.getStatusCode(); + if (code == PRECONDITION_FAILURE_ERROR_CODE || e.getErrorCode() == BlobErrorCode.CONDITION_NOT_MET) { + logger.info("OwnerId: {}, Unable to write new lock file. Another process has modified this lockfile {} already.", + ownerId, lockFileUri); + return LockUpsertResult.ACQUIRED_BY_OTHERS; + } else if (code == CONFLICT_ERROR_CODE) { + logger.info("OwnerId: {}, Retriable conditional request conflict error: {}", ownerId, lockFileUri); + } else if (code == RATE_LIMIT_ERROR_CODE) { + logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", ownerId, lockFileUri); + } else if (code >= INTERNAL_SERVER_ERROR_CODE_MIN) { + logger.warn("OwnerId: {}, Azure returned internal server error code for lock file: {}", ownerId, lockFileUri, e); + } else { + logger.warn("OwnerId: {}, Error writing lock file: {}", ownerId, lockFileUri, e); Review Comment: Should we `throw e` here instead, since its an UNKNOWN case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
