Copilot commented on code in PR #18165:
URL: https://github.com/apache/pinot/pull/18165#discussion_r3067146988


##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/metadata/ColumnPartitionMetadata.java:
##########
@@ -180,8 +194,10 @@ public ColumnPartitionMetadata deserialize(JsonParser p, 
DeserializationContext
             });
       }
 
+      JsonNode functionExprNode = jsonMetadata.get(FUNCTION_EXPR_KEY);
       return new 
ColumnPartitionMetadata(jsonMetadata.get(FUNCTION_NAME_KEY).asText(),
-          jsonMetadata.get(NUM_PARTITIONS_KEY).asInt(), partitions, 
functionConfig);
+          jsonMetadata.get(NUM_PARTITIONS_KEY).asInt(), partitions, 
functionConfig,
+          functionExprNode != null ? functionExprNode.asText() : null);

Review Comment:
   ColumnPartitionMetadataDeserializer treats a JSON null `functionExpr` as the 
literal string "null" via `asText()`. Because 
`SegmentPartitionMetadata.toJsonString()` uses the default JsonUtils 
ObjectMapper (which includes null fields), non-expression partition metadata 
will serialize `"functionExpr":null`, deserialize to "null", and then 
downstream code will attempt to compile/evaluate an invalid expression instead 
of treating it as absent. Update the deserializer to treat missing or JSON-null 
`functionExpr` as Java null (e.g., check `functionExprNode != null && 
!functionExprNode.isNull()` and optionally 
`!functionExprNode.asText().isBlank()`).



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionFunctionExprCompiler.java:
##########
@@ -0,0 +1,897 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.partition.pipeline;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Array;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.annotations.ScalarFunction;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+/**
+ * Compiles a restricted partition-function expression into a typed {@link 
PartitionPipeline} backed by deterministic
+ * scalar functions.
+ */
+public final class PartitionFunctionExprCompiler {
+  private static final int MAX_EXPRESSION_LENGTH = 256;
+  private static final int MAX_DEPTH = 8;
+  private static final int MAX_ARGUMENTS = 32;
+  private static final String FUNCTION_REGEX = ".*\\.function\\..*";
+  private static final Object DYNAMIC_ARGUMENT = new Object();
+  private static final Set<String> MASK_NORMALIZER_FUNCTIONS = 
Collections.unmodifiableSet(new java.util.HashSet<>(
+      List.of("murmur2", "murmurhash2", "murmurhash2utf8", "murmur332", 
"murmurhash3bit32", "fnv132",
+          "fnv1hash32", "fnv1a32", "fnv1ahash32", "fnv164", "fnv1hash64", 
"fnv1a64", "fnv1ahash64")));
+
+  private static final Map<String, List<ScalarFunctionMethod>> 
FUNCTION_METHODS = loadFunctions();
+
+  private PartitionFunctionExprCompiler() {
+  }
+
+  public static PartitionPipeline compile(String rawColumn, String 
functionExpr) {
+    Preconditions.checkArgument(hasText(rawColumn), "Raw column must be 
configured");
+    Preconditions.checkArgument(hasText(functionExpr), "'functionExpr' must be 
configured");
+    Preconditions.checkArgument(functionExpr.length() <= MAX_EXPRESSION_LENGTH,
+        "'functionExpr' must be <= %s characters", MAX_EXPRESSION_LENGTH);
+
+    Parser parser = new Parser(functionExpr);
+    Node root = parser.parse();
+    List<PartitionStep> steps = new ArrayList<>();
+    CompileResult result = compile(rawColumn, root, steps, 0);
+    Preconditions.checkArgument(result._dynamic,
+        "Partition function expression must reference partition column '%s'", 
rawColumn);
+    return new PartitionPipeline(rawColumn, PartitionValueType.STRING, 
result._outputType, result._canonicalExpr,
+        result._intNormalizer, steps);
+  }
+
+  public static PartitionPipelineFunction compilePartitionFunction(String 
rawColumn, String functionExpr,
+      int numPartitions) {
+    return new PartitionPipelineFunction(compile(rawColumn, functionExpr), 
numPartitions);
+  }
+
+  private static CompileResult compile(String rawColumn, Node node, 
List<PartitionStep> steps, int depth) {
+    Preconditions.checkArgument(depth <= MAX_DEPTH,
+        "Partition function expression depth exceeds the maximum of %s", 
MAX_DEPTH);
+    if (node instanceof IdentifierNode) {
+      IdentifierNode identifierNode = (IdentifierNode) node;
+      Preconditions.checkArgument(identifierNode._name.equals(rawColumn),
+          "Partition function expression must reference only partition column 
'%s', got '%s'",
+          rawColumn, identifierNode._name);
+      return CompileResult.dynamic(PartitionValueType.STRING, rawColumn, null);
+    }
+    if (node instanceof LiteralNode) {
+      LiteralNode literalNode = (LiteralNode) node;
+      return CompileResult.constant(literalNode._value.getType(), 
literalNode._canonicalForm, literalNode._value);
+    }
+
+    FunctionNode functionNode = (FunctionNode) node;
+    List<CompileResult> arguments = new 
ArrayList<>(functionNode._arguments.size());
+    int dynamicCount = 0;
+    int dynamicIndex = -1;
+    for (int i = 0; i < functionNode._arguments.size(); i++) {
+      CompileResult argument = compile(rawColumn, 
functionNode._arguments.get(i), steps, depth + 1);
+      arguments.add(argument);
+      if (argument._dynamic) {
+        dynamicCount++;
+        dynamicIndex = i;
+      }
+    }
+    Preconditions.checkArgument(dynamicCount <= 1,
+        "Partition function expression must reference partition column '%s' 
through a single argument chain",
+        rawColumn);
+
+    String displayName = functionNode._name.toLowerCase(Locale.ROOT);
+    BoundFunction bestMatch = resolve(displayName, arguments);
+    String canonicalExpr = toCanonicalExpr(displayName, arguments);
+    if (!bestMatch.isDynamic()) {
+      return CompileResult.constant(bestMatch.getOutputType(), canonicalExpr, 
bestMatch.invoke(null));
+    }
+
+    steps.add(bestMatch.toStep(displayName));
+    return CompileResult.dynamic(bestMatch.getOutputType(), canonicalExpr,
+        resolveIntNormalizer(displayName, bestMatch.getOutputType(), 
arguments.get(dynamicIndex)));
+  }
+
+  private static BoundFunction resolve(String functionName, 
List<CompileResult> arguments) {
+    String canonicalName = canonicalize(functionName);
+    List<ScalarFunctionMethod> methods = FUNCTION_METHODS.get(canonicalName);
+    Preconditions.checkArgument(methods != null, "Unsupported partition scalar 
function: %s", functionName);
+
+    BoundFunction bestMatch = null;
+    StringBuilder supportedSignatures = new StringBuilder();
+    boolean sawNonDeterministicCandidate = false;
+    boolean sawDeterministicCandidate = false;
+    for (ScalarFunctionMethod method : methods) {
+      if (supportedSignatures.length() > 0) {
+        supportedSignatures.append(", ");
+      }
+      supportedSignatures.append(method.getSignature());
+
+      if (!method.isDeterministic()) {
+        sawNonDeterministicCandidate = true;
+        continue;
+      }
+      sawDeterministicCandidate = true;
+
+      BoundFunction candidate = method.bind(arguments);
+      if (candidate != null) {
+        if (bestMatch == null || candidate.getCost() < bestMatch.getCost()) {
+          bestMatch = candidate;
+        } else if (candidate.getCost() == bestMatch.getCost()) {
+          throw new IllegalArgumentException(String.format(
+              "Ambiguous partition scalar function '%s' for argument types 
(%s). Matching signatures: %s",
+              functionName, formatArgumentTypes(arguments), 
supportedSignatures));
+        }
+      }
+    }
+
+    if (bestMatch != null) {
+      return bestMatch;
+    }
+    if (sawNonDeterministicCandidate && !sawDeterministicCandidate) {
+      throw new IllegalArgumentException(String.format(
+          "Partition scalar function '%s' is not allowed because it is 
non-deterministic", functionName));
+    }
+    throw new IllegalArgumentException(String.format(
+        "Function '%s' does not accept argument types (%s). Supported 
signatures: %s", functionName,
+        formatArgumentTypes(arguments), supportedSignatures));
+  }
+
+  @Nullable
+  private static PartitionIntNormalizer resolveIntNormalizer(String 
functionName, PartitionValueType outputType,
+      CompileResult child) {
+    if (!outputType.isIntegral()) {
+      return null;
+    }
+    String canonicalName = canonicalize(functionName);
+    if ("identity".equals(canonicalName)) {
+      Preconditions.checkState(child._intNormalizer != null,
+          "Integral identity step must preserve a child INT normalizer");
+      return child._intNormalizer;
+    }
+    if (MASK_NORMALIZER_FUNCTIONS.contains(canonicalName)) {
+      return PartitionIntNormalizer.MASK;
+    }
+    return PartitionIntNormalizer.POSITIVE_MODULO;
+  }
+
+  private static Map<String, List<ScalarFunctionMethod>> loadFunctions() {
+    Map<String, List<ScalarFunctionMethod>> functionMethods = new HashMap<>();
+    for (Method method : 
PinotReflectionUtils.getMethodsThroughReflection(FUNCTION_REGEX, 
ScalarFunction.class)) {
+      if (!Modifier.isPublic(method.getModifiers())) {
+        continue;
+      }
+      ScalarFunction scalarFunction = 
method.getAnnotation(ScalarFunction.class);
+      if (scalarFunction == null || !scalarFunction.enabled()) {
+        continue;
+      }
+      if (!isSupportedMethod(method)) {
+        continue;
+      }
+      if (!Modifier.isStatic(method.getModifiers()) && 
getEmptyConstructor(method.getDeclaringClass()) == null) {
+        continue;
+      }
+
+      ScalarFunctionMethod functionMethod = new ScalarFunctionMethod(method, 
scalarFunction.isDeterministic());
+      String[] names = scalarFunction.names();
+      if (names.length == 0) {
+        register(functionMethods, method.getName(), functionMethod);
+      } else {
+        for (String name : names) {
+          register(functionMethods, name, functionMethod);
+        }
+      }
+    }
+
+    Map<String, List<ScalarFunctionMethod>> immutable = new 
HashMap<>(functionMethods.size());
+    for (Map.Entry<String, List<ScalarFunctionMethod>> entry : 
functionMethods.entrySet()) {
+      immutable.put(entry.getKey(), 
Collections.unmodifiableList(entry.getValue()));
+    }
+    return Collections.unmodifiableMap(immutable);
+  }
+
+  private static void register(Map<String, List<ScalarFunctionMethod>> 
functionMethods, String name,
+      ScalarFunctionMethod functionMethod) {
+    functionMethods.computeIfAbsent(canonicalize(name), ignored -> new 
ArrayList<>()).add(functionMethod);
+  }
+
+  private static boolean isSupportedMethod(Method method) {
+    try {
+      PartitionValueType.fromJavaType(method.getReturnType());
+    } catch (IllegalArgumentException e) {
+      return false;
+    }
+
+    Class<?>[] parameterTypes = method.getParameterTypes();
+    int lastIndex = parameterTypes.length - 1;
+    for (int i = 0; i < parameterTypes.length; i++) {
+      Class<?> parameterType = parameterTypes[i];
+      if (method.isVarArgs() && i == lastIndex) {
+        if (!parameterType.isArray() || parameterType == byte[].class) {
+          return false;
+        }
+        try {
+          PartitionValueType.fromJavaType(parameterType.getComponentType());
+        } catch (IllegalArgumentException e) {
+          return false;
+        }
+      } else {
+        try {
+          PartitionValueType.fromJavaType(parameterType);
+        } catch (IllegalArgumentException e) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  private static String formatArgumentTypes(List<CompileResult> arguments) {
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < arguments.size(); i++) {
+      if (i > 0) {
+        builder.append(", ");
+      }
+      builder.append(arguments.get(i)._outputType);
+    }
+    return builder.toString();
+  }
+
+  private static String toCanonicalExpr(String functionName, 
List<CompileResult> arguments) {
+    StringBuilder builder = new StringBuilder(functionName).append('(');
+    for (int i = 0; i < arguments.size(); i++) {
+      if (i > 0) {
+        builder.append(", ");
+      }
+      builder.append(arguments.get(i)._canonicalExpr);
+    }
+    return builder.append(')').toString();
+  }
+
+  private static String canonicalize(String name) {
+    StringBuilder builder = new StringBuilder(name.length());
+    for (int i = 0; i < name.length(); i++) {
+      char character = name.charAt(i);
+      if (character != '_') {
+        builder.append(Character.toLowerCase(character));
+      }
+    }
+    return builder.toString();
+  }
+
+  private static boolean hasText(@Nullable String value) {
+    return value != null && !value.trim().isEmpty();
+  }
+
+  private abstract static class Node {
+  }
+
+  private static final class IdentifierNode extends Node {
+    private final String _name;
+
+    private IdentifierNode(String name) {
+      _name = name;
+    }
+  }
+
+  private static final class LiteralNode extends Node {
+    private final PartitionValue _value;
+    private final String _canonicalForm;
+
+    private LiteralNode(PartitionValue value, String canonicalForm) {
+      _value = value;
+      _canonicalForm = canonicalForm;
+    }
+  }
+
+  private static final class FunctionNode extends Node {
+    private final String _name;
+    private final List<Node> _arguments;
+
+    private FunctionNode(String name, List<Node> arguments) {
+      _name = name;
+      _arguments = arguments;
+    }
+  }
+
+  private static final class CompileResult {
+    private final PartitionValueType _outputType;
+    private final String _canonicalExpr;
+    private final boolean _dynamic;
+    @Nullable
+    private final PartitionValue _constantValue;
+    @Nullable
+    private final PartitionIntNormalizer _intNormalizer;
+
+    private CompileResult(PartitionValueType outputType, String canonicalExpr, 
boolean dynamic,
+        @Nullable PartitionValue constantValue, @Nullable 
PartitionIntNormalizer intNormalizer) {
+      _outputType = outputType;
+      _canonicalExpr = canonicalExpr;
+      _dynamic = dynamic;
+      _constantValue = constantValue;
+      _intNormalizer = intNormalizer;
+    }
+
+    private static CompileResult dynamic(PartitionValueType outputType, String 
canonicalExpr,
+        @Nullable PartitionIntNormalizer intNormalizer) {
+      return new CompileResult(outputType, canonicalExpr, true, null, 
intNormalizer);
+    }
+
+    private static CompileResult constant(PartitionValueType outputType, 
String canonicalExpr, PartitionValue value) {
+      return new CompileResult(outputType, canonicalExpr, false, value, null);
+    }
+  }
+
+  private static final class ScalarFunctionMethod {
+    private final Method _method;
+    private final boolean _staticMethod;
+    @Nullable
+    private final ThreadLocal<Object> _threadLocalTarget;
+    private final Class<?>[] _parameterTypes;
+    private final boolean _varArgs;
+    private final boolean _deterministic;
+    private final PartitionValueType _outputType;
+    private final String _signature;
+
+    private ScalarFunctionMethod(Method method, boolean deterministic) {
+      _method = method;
+      _staticMethod = Modifier.isStatic(method.getModifiers());
+      _parameterTypes = method.getParameterTypes();
+      _varArgs = method.isVarArgs();
+      _deterministic = deterministic;
+      _outputType = PartitionValueType.fromJavaType(method.getReturnType());
+      _signature = buildSignature(method);
+      _threadLocalTarget = _staticMethod ? null : 
buildThreadLocalTarget(method);
+    }
+
+    public boolean isDeterministic() {
+      return _deterministic;
+    }
+
+    public String getSignature() {
+      return _signature;
+    }
+
+    @Nullable
+    public BoundFunction bind(List<CompileResult> arguments) {
+      int parameterCount = _parameterTypes.length;
+      int fixedParameterCount = _varArgs ? parameterCount - 1 : parameterCount;
+      if ((!_varArgs && arguments.size() != parameterCount) || (_varArgs && 
arguments.size() < fixedParameterCount)) {
+        return null;
+      }
+
+      Object[] constantArguments = new Object[arguments.size()];
+      int totalCost = 0;
+      int dynamicIndex = -1;
+      PartitionValueType inputType = PartitionValueType.STRING;
+      Class<?> dynamicParameterType = String.class;
+      for (int i = 0; i < arguments.size(); i++) {
+        CompileResult argument = arguments.get(i);
+        Class<?> parameterType = getParameterType(i);
+        if (argument._dynamic) {
+          if (dynamicIndex >= 0) {
+            return null;
+          }
+          int cost = getDynamicConversionCost(argument._outputType, 
parameterType);
+          if (cost < 0) {
+            return null;
+          }
+          totalCost += cost;
+          dynamicIndex = i;
+          inputType = argument._outputType;
+          dynamicParameterType = parameterType;
+          constantArguments[i] = DYNAMIC_ARGUMENT;
+        } else {
+          ConvertedConstant convertedConstant = 
convertConstant(argument._constantValue, parameterType);
+          if (convertedConstant == null) {
+            return null;
+          }
+          totalCost += convertedConstant._cost;
+          constantArguments[i] = convertedConstant._value;
+        }
+      }
+      return new BoundFunction(this, inputType, dynamicParameterType, 
dynamicIndex, constantArguments, totalCost);
+    }
+
+    public PartitionValueType getOutputType() {
+      return _outputType;
+    }
+
+    private Class<?> getParameterType(int index) {
+      if (_varArgs && index >= _parameterTypes.length - 1) {
+        return _parameterTypes[_parameterTypes.length - 1].getComponentType();
+      }
+      return _parameterTypes[index];
+    }
+
+    private Object invoke(Object[] expressionArguments) {
+      try {
+        Object target = _staticMethod ? null : 
Preconditions.checkNotNull(_threadLocalTarget).get();
+        return _method.invoke(target, adaptArguments(expressionArguments));
+      } catch (IllegalAccessException e) {
+        throw new IllegalStateException("Failed to invoke partition scalar 
function: " + _signature, e);
+      } catch (InvocationTargetException e) {
+        Throwable cause = e.getTargetException();
+        if (cause instanceof RuntimeException) {
+          throw (RuntimeException) cause;
+        }
+        throw new IllegalStateException("Caught checked exception while 
invoking partition scalar function: "
+            + _signature, cause);
+      }
+    }
+
+    private Object[] adaptArguments(Object[] expressionArguments) {
+      if (!_varArgs) {
+        return expressionArguments;
+      }
+
+      int fixedParameterCount = _parameterTypes.length - 1;
+      Object[] methodArguments = new Object[_parameterTypes.length];
+      for (int i = 0; i < fixedParameterCount; i++) {
+        methodArguments[i] = expressionArguments[i];
+      }
+      Class<?> componentType = _parameterTypes[_parameterTypes.length - 
1].getComponentType();
+      int varArgCount = expressionArguments.length - fixedParameterCount;
+      Object varArgArray = Array.newInstance(componentType, varArgCount);
+      for (int i = 0; i < varArgCount; i++) {
+        Array.set(varArgArray, i, expressionArguments[fixedParameterCount + 
i]);
+      }
+      methodArguments[_parameterTypes.length - 1] = varArgArray;
+      return methodArguments;
+    }
+
+    private static ThreadLocal<Object> buildThreadLocalTarget(Method method) {
+      Constructor<?> constructor = 
getEmptyConstructor(method.getDeclaringClass());
+      Preconditions.checkState(constructor != null,
+          "Non-static partition scalar function must have an empty 
constructor: %s", method);
+      return ThreadLocal.withInitial(() -> instantiateTarget(constructor, 
method));
+    }
+
+    private static Object instantiateTarget(Constructor<?> constructor, Method 
method) {
+      try {
+        return constructor.newInstance();
+      } catch (Exception e) {
+        throw new IllegalStateException("Failed to instantiate partition 
scalar function target: " + method, e);
+      }
+    }
+
+    private static String buildSignature(Method method) {
+      StringBuilder builder = new StringBuilder(method.getName()).append('(');
+      Class<?>[] parameterTypes = method.getParameterTypes();
+      for (int i = 0; i < parameterTypes.length; i++) {
+        if (i > 0) {
+          builder.append(", ");
+        }
+        Class<?> parameterType = parameterTypes[i];
+        if (method.isVarArgs() && i == parameterTypes.length - 1) {
+          
builder.append(parameterType.getComponentType().getSimpleName()).append("...");
+        } else {
+          builder.append(parameterType.getSimpleName());
+        }
+      }
+      return builder.append(')').toString();
+    }
+  }
+
+  @Nullable
+  private static Constructor<?> getEmptyConstructor(Class<?> clazz) {
+    try {
+      return clazz.getConstructor();
+    } catch (NoSuchMethodException e) {
+      return null;
+    }
+  }
+
+  private static final class BoundFunction {
+    private final ScalarFunctionMethod _method;
+    private final PartitionValueType _inputType;
+    private final Class<?> _dynamicParameterType;
+    private final int _dynamicIndex;
+    private final Object[] _constantArguments;
+    private final int _cost;
+
+    private BoundFunction(ScalarFunctionMethod method, PartitionValueType 
inputType, Class<?> dynamicParameterType,
+        int dynamicIndex, Object[] constantArguments, int cost) {
+      _method = method;
+      _inputType = inputType;
+      _dynamicParameterType = dynamicParameterType;
+      _dynamicIndex = dynamicIndex;
+      _constantArguments = constantArguments;
+      _cost = cost;
+    }
+
+    public int getCost() {
+      return _cost;
+    }
+
+    public boolean isDynamic() {
+      return _dynamicIndex >= 0;
+    }
+
+    public PartitionValueType getOutputType() {
+      return _method.getOutputType();
+    }
+
+    public PartitionStep toStep(String displayName) {
+      Preconditions.checkState(isDynamic(), "Cannot create a pipeline step for 
a constant-only function");
+      return new PartitionStep(displayName, _inputType, 
_method.getOutputType(), this::invoke);
+    }
+
+    public PartitionValue invoke(@Nullable PartitionValue dynamicInput) {
+      Object[] expressionArguments = _constantArguments.clone();
+      if (_dynamicIndex >= 0) {
+        Preconditions.checkNotNull(dynamicInput, "Dynamic partition step input 
must be configured");
+        expressionArguments[_dynamicIndex] = convertDynamic(dynamicInput, 
_dynamicParameterType);
+      }
+      return PartitionValue.fromObject(_method.invoke(expressionArguments));
+    }

Review Comment:
   BoundFunction.invoke() clones the full `_constantArguments` array on every 
evaluation, which will execute once per row for ingestion partitioning (e.g., 
TableConfigPartitioner) and can create significant allocation pressure. 
Consider avoiding per-call array cloning by keeping a prebuilt argument array 
per-thread (ThreadLocal) or by storing constant arguments separately and only 
materializing the varargs array when needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to