jun-he commented on a change in pull request #3407:
URL: https://github.com/apache/iceberg/pull/3407#discussion_r739373790



##########
File path: python/src/iceberg/partition_spec.py
##########
@@ -0,0 +1,289 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from urllib import parse
+from collections import defaultdict
+from typing import List, Optional
+
+from iceberg.partition_field import PartitionField
+from iceberg.schema import Schema
+from iceberg.types import Type, NestedField
+from iceberg.validation_exception import ValidationException
+
+PARTITION_DATA_ID_START = 1000
+
+
+class PartitionSpec(object):
+    fields_by_source_id: defaultdict[list] = None
+    field_list: List[PartitionField] = None
+
+    def __init__(
+        self,
+        schema: Schema,
+        spec_id: int,
+        part_fields: List[PartitionField],
+        last_assigned_field_id: int,
+    ):
+        self._schema = schema
+        self._spec_id = spec_id
+        self._fields = [part_field for part_field in part_fields]
+        self._last_assigned_field_id = last_assigned_field_id
+
+    @property
+    def schema(self) -> Schema:
+        return self._schema
+
+    @property
+    def spec_id(self) -> int:
+        return self._spec_id
+
+    @property
+    def fields(self) -> List[PartitionField]:
+        return self._fields
+
+    @property
+    def last_assigned_field_id(self) -> int:
+        return self._last_assigned_field_id
+
+    def is_partitioned(self):
+        return len(self.fields) < 1
+
+    def _generate_fields_by_source_id(self):
+        if self.fields_by_source_id is None:
+            fields_source_to_field_dict = defaultdict(list)
+            for field in self.fields:
+                fields_source_to_field_dict[field.source_id] = [field]
+            return fields_source_to_field_dict
+        return None
+
+    def get_fields_by_source_id(self, field_id: int) -> List[PartitionField]:
+        return self._generate_fields_by_source_id().get(field_id, None)
+
+    def __eq__(self, other):
+        if isinstance(other, PartitionSpec):
+            if self.spec_id != other.spec_id:
+                return False
+            return self.fields == other.fields
+        return False
+
+    def __str__(self):
+        partition_spec_str = "["
+        for field in self.fields:
+            partition_spec_str += "\n"
+            partition_spec_str += " " + field
+        if len(self.fields) > 0:
+            partition_spec_str += "\n"
+        partition_spec_str += "]"
+        return partition_spec_str
+
+    def compatible_with(self, other):
+        if self.__eq__(other):
+            return True
+
+        if len(self.fields) != len(other.fields):
+            return False
+
+        index = 0
+        for field in self.fields:
+            other_field: PartitionField = other.fields[index]
+            if (
+                field.source_id != other_field.source_id
+                or field.name != other_field.name
+            ):
+                index += 1
+                # TODO: Add transform check
+                return False
+        return True
+
+    def partition_type(self):
+        struct_fields = []
+        # TODO: Needs transform
+        pass
+
+    def escape(self, input_str):
+        try:
+            return parse.urlencode(input_str, encoding="utf-8")
+        except TypeError as e:
+            raise e
+
+    def partition_to_path(self):
+        # TODO: Needs transform
+        pass
+
+    def _generate_unpartitioned_spec(self):
+        return PartitionSpec(
+            schema=Schema(),
+            spec_id=0,
+            part_fields=[],
+            last_assigned_field_id=PARTITION_DATA_ID_START - 1,
+        )
+
+    def unpartitioned(self) -> PartitionSpec:
+        return self._generate_unpartitioned_spec()
+
+    def check_compatibility(self, spec: PartitionSpec, schema: Schema):
+        for field in spec.fields:
+            source_type = schema.find_type(field.source_id)
+            ValidationException().check(
+                source_type != None,
+                f"Cannot find source column for partition field: {field}",
+            )
+            ValidationException().check(
+                source_type.is_primitive,
+                f"Cannot partition by non-primitive source field: 
{source_type}",
+            )
+            # TODO: Add transform check
+
+    def has_sequential_ids(self, spec: PartitionSpec):
+        index = 0
+        for field in spec.fields:
+            if field.field_id != PARTITION_DATA_ID_START + index:
+                return False
+            index += 1
+        return True
+
+
+class AtomicInteger:
+    # TODO: Move to utils
+    def __init__(self, value=0):
+        self._value = int(value)
+        self._lock = threading.Lock()
+
+    def inc(self, d=1):
+        with self._lock:
+            self._value += int(d)
+            return self._value
+
+    def dec(self, d=1):
+        return self.inc(-d)
+
+    @property
+    def value(self):
+        with self._lock:
+            return self._value
+
+    @value.setter
+    def value(self, v):
+        with self._lock:
+            self._value = int(v)
+            return self._value
+
+
+class Builder(object):

Review comment:
       I like constructing objects in this kind of pattern but I think we can 
achieve it without using typical Java builder in a more pythonic way. For 
example, 
   ```
   def buildermethod(func):
     def wrapper(self, *args, **kwargs):
       func(self, *args, **kwargs)
       return self
     return wrapper
   
   class PartitionSpec:
     def __init__(self):
       pass
   
     @buildermethod
     def with_schema(self, schema):
       # additional checks
       self_schema = schema
   ...
   
   partition_spec = PartitionSpec().with_schema(schama)...
   ```
   Also, we won't be able to get immutability that Java builder offers in 
Python.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to