http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/memory/AttributeStores.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/memory/AttributeStores.java b/repository/src/main/java/org/apache/atlas/repository/memory/AttributeStores.java new file mode 100755 index 0000000..59cf67d --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/memory/AttributeStores.java @@ -0,0 +1,655 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.memory; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import it.unimi.dsi.fastutil.booleans.BooleanArrayList; +import it.unimi.dsi.fastutil.bytes.ByteArrayList; +import it.unimi.dsi.fastutil.doubles.DoubleArrayList; +import it.unimi.dsi.fastutil.floats.FloatArrayList; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.shorts.ShortArrayList; +import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.persistence.StructInstance; +import org.apache.atlas.typesystem.types.AttributeInfo; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.IConstructableType; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AttributeStores { + + private static final Object NULL_VAL = new Object(); + + static IAttributeStore createStore(AttributeInfo i) throws RepositoryException { + switch (i.dataType().getTypeCategory()) { + case PRIMITIVE: + if (i.dataType() == DataTypes.BOOLEAN_TYPE) { + return new BooleanAttributeStore(i); + } else if (i.dataType() == DataTypes.BYTE_TYPE) { + return new ByteAttributeStore(i); + } else if (i.dataType() == DataTypes.SHORT_TYPE) { + new ShortAttributeStore(i); + } else if (i.dataType() == DataTypes.INT_TYPE) { + return new IntAttributeStore(i); + } else if (i.dataType() == DataTypes.LONG_TYPE) { + return new LongAttributeStore(i); + } else if (i.dataType() == DataTypes.FLOAT_TYPE) { + return new FloatAttributeStore(i); + } else if (i.dataType() == DataTypes.DOUBLE_TYPE) { + return new DoubleAttributeStore(i); + } else if (i.dataType() == DataTypes.BIGINTEGER_TYPE) { + return new BigIntStore(i); + } else if (i.dataType() == DataTypes.BIGDECIMAL_TYPE) { + return new BigDecimalStore(i); + } else if (i.dataType() == DataTypes.DATE_TYPE) { + return new DateStore(i); + } else if (i.dataType() == DataTypes.STRING_TYPE) { + return new StringStore(i); + } else if (i.dataType() == DataTypes.STRING_TYPE) { + return new StringStore(i); + } else { + throw new RepositoryException( + String.format("Unknown datatype %s", i.dataType())); + } + case ENUM: + return new IntAttributeStore(i); + case ARRAY: + return new ImmutableListStore(i); + case MAP: + return new ImmutableMapStore(i); + case STRUCT: + return new StructStore(i); + case CLASS: + return new IdStore(i); + default: + throw new RepositoryException( + String.format("Unknown Category for datatype %s", i.dataType())); + } + } + + static abstract class AbstractAttributeStore implements IAttributeStore { + final BooleanArrayList nullList; + final Map<Integer, Map<String, Object>> hiddenVals; + AttributeInfo attrInfo; + + AbstractAttributeStore(AttributeInfo attrInfo) { + this.attrInfo = attrInfo; + this.nullList = new BooleanArrayList(); + hiddenVals = new HashMap<Integer, Map<String, Object>>(); + } + + final void setNull(int pos, boolean flag) { + nullList.set(pos, flag); + } + + final boolean getNull(int pos) { + return nullList.get(pos); + } + + void storeHiddenVals(int pos, IConstructableType type, StructInstance instance) + throws RepositoryException { + List<String> attrNames = type.getNames(attrInfo); + Map<String, Object> m = hiddenVals.get(pos); + if (m == null) { + m = new HashMap<String, Object>(); + hiddenVals.put(pos, m); + } + for (int i = 2; i < attrNames.size(); i++) { + String attrName = attrNames.get(i); + int nullPos = instance.fieldMapping().fieldNullPos.get(attrName); + int colPos = instance.fieldMapping().fieldPos.get(attrName); + if (instance.nullFlags[nullPos]) { + m.put(attrName, NULL_VAL); + } else { + //m.put(attrName, instance.bools[colPos]); + store(instance, colPos, attrName, m); + } + } + } + + void loadHiddenVals(int pos, IConstructableType type, StructInstance instance) + throws RepositoryException { + List<String> attrNames = type.getNames(attrInfo); + Map<String, Object> m = hiddenVals.get(pos); + for (int i = 2; i < attrNames.size(); i++) { + String attrName = attrNames.get(i); + int nullPos = instance.fieldMapping().fieldNullPos.get(attrName); + int colPos = instance.fieldMapping().fieldPos.get(attrName); + Object val = m == null ? NULL_VAL : m.get(attrName); + if (val == NULL_VAL) { + instance.nullFlags[nullPos] = true; + } else { + instance.nullFlags[nullPos] = false; + load(instance, colPos, val); + } + } + } + + @Override + public void store(int pos, IConstructableType type, StructInstance instance) + throws RepositoryException { + List<String> attrNames = type.getNames(attrInfo); + String attrName = attrNames.get(0); + int nullPos = instance.fieldMapping().fieldNullPos.get(attrName); + int colPos = instance.fieldMapping().fieldPos.get(attrName); + nullList.set(pos, instance.nullFlags[nullPos]); + + if (pos == nullList.size()) { + nullList.add(instance.nullFlags[nullPos]); + } else { + nullList.set(pos, instance.nullFlags[nullPos]); + } + //list.set(pos, instance.bools[colPos]); + store(instance, colPos, pos); + + if (attrNames.size() > 1) { + storeHiddenVals(pos, type, instance); + } + } + + @Override + public void load(int pos, IConstructableType type, StructInstance instance) + throws RepositoryException { + List<String> attrNames = type.getNames(attrInfo); + String attrName = attrNames.get(0); + int nullPos = instance.fieldMapping().fieldNullPos.get(attrName); + int colPos = instance.fieldMapping().fieldPos.get(attrName); + + if (nullList.get(pos)) { + instance.nullFlags[nullPos] = true; + } else { + instance.nullFlags[nullPos] = false; + load(instance, colPos, pos); + } + + if (attrNames.size() > 1) { + loadHiddenVals(pos, type, instance); + } + } + + /* + * store the value from colPos in instance into the list. + */ + protected abstract void store(StructInstance instance, int colPos, int pos) + throws RepositoryException; + + /* + * load the value from pos in list into colPos in instance. + */ + protected abstract void load(StructInstance instance, int colPos, int pos) + throws RepositoryException; + + /* + * store the value from colPos in map as attrName + */ + protected abstract void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m); + + /* + * load the val into colPos in instance. + */ + protected abstract void load(StructInstance instance, int colPos, Object val); + + } + + static abstract class PrimitiveAttributeStore extends AbstractAttributeStore + implements IAttributeStore { + + + public PrimitiveAttributeStore(AttributeInfo attrInfo) { + super(attrInfo); + } + + } + + static class BooleanAttributeStore extends PrimitiveAttributeStore { + + final BooleanArrayList list; + + BooleanAttributeStore(AttributeInfo attrInfo) { + super(attrInfo); + this.list = new BooleanArrayList(); + } + + protected void store(StructInstance instance, int colPos, int pos) { + list.set(pos, instance.bools[colPos]); + } + + protected void load(StructInstance instance, int colPos, int pos) { + instance.bools[colPos] = list.get(pos); + } + + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.bools[colPos]); + } + + protected void load(StructInstance instance, int colPos, Object val) { + instance.bools[colPos] = (Boolean) val; + } + + @Override + public void ensureCapacity(int pos) throws RepositoryException { + list.size(pos + 1); + nullList.size(pos + 1); + } + } + + static class ByteAttributeStore extends PrimitiveAttributeStore { + + final ByteArrayList list; + + ByteAttributeStore(AttributeInfo attrInfo) { + super(attrInfo); + this.list = new ByteArrayList(); + } + + protected void store(StructInstance instance, int colPos, int pos) { + list.set(pos, instance.bytes[colPos]); + } + + protected void load(StructInstance instance, int colPos, int pos) { + instance.bytes[colPos] = list.get(pos); + } + + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.bytes[colPos]); + } + + protected void load(StructInstance instance, int colPos, Object val) { + instance.bytes[colPos] = (Byte) val; + } + + @Override + public void ensureCapacity(int pos) throws RepositoryException { + list.size(pos + 1); + nullList.size(pos + 1); + } + } + + static class ShortAttributeStore extends PrimitiveAttributeStore { + + final ShortArrayList list; + + ShortAttributeStore(AttributeInfo attrInfo) { + super(attrInfo); + this.list = new ShortArrayList(); + } + + protected void store(StructInstance instance, int colPos, int pos) { + list.set(pos, instance.shorts[colPos]); + } + + protected void load(StructInstance instance, int colPos, int pos) { + instance.shorts[colPos] = list.get(pos); + } + + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.shorts[colPos]); + } + + protected void load(StructInstance instance, int colPos, Object val) { + instance.shorts[colPos] = (Short) val; + } + + @Override + public void ensureCapacity(int pos) throws RepositoryException { + list.size(pos + 1); + nullList.size(pos + 1); + } + } + + static class IntAttributeStore extends PrimitiveAttributeStore { + + final IntArrayList list; + + IntAttributeStore(AttributeInfo attrInfo) { + super(attrInfo); + this.list = new IntArrayList(); + } + + protected void store(StructInstance instance, int colPos, int pos) { + list.set(pos, instance.ints[colPos]); + } + + protected void load(StructInstance instance, int colPos, int pos) { + instance.ints[colPos] = list.get(pos); + } + + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.ints[colPos]); + } + + protected void load(StructInstance instance, int colPos, Object val) { + instance.ints[colPos] = (Integer) val; + } + + @Override + public void ensureCapacity(int pos) throws RepositoryException { + list.size(pos + 1); + nullList.size(pos + 1); + } + } + + static class LongAttributeStore extends PrimitiveAttributeStore { + + final LongArrayList list; + + LongAttributeStore(AttributeInfo attrInfo) { + super(attrInfo); + this.list = new LongArrayList(); + } + + protected void store(StructInstance instance, int colPos, int pos) { + list.set(pos, instance.longs[colPos]); + } + + protected void load(StructInstance instance, int colPos, int pos) { + instance.longs[colPos] = list.get(pos); + } + + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.longs[colPos]); + } + + protected void load(StructInstance instance, int colPos, Object val) { + instance.longs[colPos] = (Long) val; + } + + @Override + public void ensureCapacity(int pos) throws RepositoryException { + list.size(pos + 1); + nullList.size(pos + 1); + } + } + + static class FloatAttributeStore extends PrimitiveAttributeStore { + + final FloatArrayList list; + + FloatAttributeStore(AttributeInfo attrInfo) { + super(attrInfo); + this.list = new FloatArrayList(); + } + + protected void store(StructInstance instance, int colPos, int pos) { + list.set(pos, instance.floats[colPos]); + } + + protected void load(StructInstance instance, int colPos, int pos) { + instance.floats[colPos] = list.get(pos); + } + + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.floats[colPos]); + } + + protected void load(StructInstance instance, int colPos, Object val) { + instance.floats[colPos] = (Float) val; + } + + @Override + public void ensureCapacity(int pos) throws RepositoryException { + list.size(pos + 1); + nullList.size(pos + 1); + } + } + + static class DoubleAttributeStore extends PrimitiveAttributeStore { + + final DoubleArrayList list; + + DoubleAttributeStore(AttributeInfo attrInfo) { + super(attrInfo); + this.list = new DoubleArrayList(); + } + + protected void store(StructInstance instance, int colPos, int pos) { + list.set(pos, instance.doubles[colPos]); + } + + protected void load(StructInstance instance, int colPos, int pos) { + instance.doubles[colPos] = list.get(pos); + } + + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.doubles[colPos]); + } + + protected void load(StructInstance instance, int colPos, Object val) { + instance.doubles[colPos] = (Double) val; + } + + @Override + public void ensureCapacity(int pos) throws RepositoryException { + list.size(pos + 1); + nullList.size(pos + 1); + } + } + + static abstract class ObjectAttributeStore<T> extends AbstractAttributeStore { + + final ArrayList<T> list; + + ObjectAttributeStore(Class<T> cls, AttributeInfo attrInfo) { + super(attrInfo); + this.list = Lists.newArrayList((T) null); + } + + @Override + public void ensureCapacity(int pos) throws RepositoryException { + while (list.size() < pos + 1) { + list.add((T) null); + } + nullList.size(pos + 1); + } + } + + static class BigIntStore extends ObjectAttributeStore<BigInteger> { + + public BigIntStore(AttributeInfo attrInfo) { + super(BigInteger.class, attrInfo); + } + + protected void store(StructInstance instance, int colPos, int pos) { + list.set(pos, instance.bigIntegers[colPos]); + } + + protected void load(StructInstance instance, int colPos, int pos) { + instance.bigIntegers[colPos] = list.get(pos); + } + + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.bigIntegers[colPos]); + } + + protected void load(StructInstance instance, int colPos, Object val) { + instance.bigIntegers[colPos] = (BigInteger) val; + } + + } + + static class BigDecimalStore extends ObjectAttributeStore<BigDecimal> { + + public BigDecimalStore(AttributeInfo attrInfo) { + super(BigDecimal.class, attrInfo); + } + + protected void store(StructInstance instance, int colPos, int pos) { + list.set(pos, instance.bigDecimals[colPos]); + } + + protected void load(StructInstance instance, int colPos, int pos) { + instance.bigDecimals[colPos] = list.get(pos); + } + + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.bigDecimals[colPos]); + } + + protected void load(StructInstance instance, int colPos, Object val) { + instance.bigDecimals[colPos] = (BigDecimal) val; + } + + } + + static class DateStore extends ObjectAttributeStore<Date> { + + public DateStore(AttributeInfo attrInfo) { + super(Date.class, attrInfo); + } + + protected void store(StructInstance instance, int colPos, int pos) { + list.set(pos, instance.dates[colPos]); + } + + protected void load(StructInstance instance, int colPos, int pos) { + instance.dates[colPos] = list.get(pos); + } + + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.dates[colPos]); + } + + protected void load(StructInstance instance, int colPos, Object val) { + instance.dates[colPos] = (Date) val; + } + + } + + static class StringStore extends ObjectAttributeStore<String> { + + public StringStore(AttributeInfo attrInfo) { + super(String.class, attrInfo); + } + + protected void store(StructInstance instance, int colPos, int pos) { + list.set(pos, instance.strings[colPos]); + } + + protected void load(StructInstance instance, int colPos, int pos) { + instance.strings[colPos] = list.get(pos); + } + + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.strings[colPos]); + } + + protected void load(StructInstance instance, int colPos, Object val) { + instance.strings[colPos] = (String) val; + } + + } + + static class IdStore extends ObjectAttributeStore<Id> { + + public IdStore(AttributeInfo attrInfo) { + super(Id.class, attrInfo); + } + + protected void store(StructInstance instance, int colPos, int pos) { + list.set(pos, instance.ids[colPos]); + } + + protected void load(StructInstance instance, int colPos, int pos) { + instance.ids[colPos] = list.get(pos); + } + + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.ids[colPos]); + } + + protected void load(StructInstance instance, int colPos, Object val) { + instance.ids[colPos] = (Id) val; + } + + } + + static class ImmutableListStore extends ObjectAttributeStore<ImmutableList> { + + public ImmutableListStore(AttributeInfo attrInfo) { + super(ImmutableList.class, attrInfo); + } + + protected void store(StructInstance instance, int colPos, int pos) { + list.set(pos, instance.arrays[colPos]); + } + + protected void load(StructInstance instance, int colPos, int pos) { + instance.arrays[colPos] = list.get(pos); + } + + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.arrays[colPos]); + } + + protected void load(StructInstance instance, int colPos, Object val) { + instance.arrays[colPos] = (ImmutableList) val; + } + + } + + static class ImmutableMapStore extends ObjectAttributeStore<ImmutableMap> { + + public ImmutableMapStore(AttributeInfo attrInfo) { + super(ImmutableMap.class, attrInfo); + } + + protected void store(StructInstance instance, int colPos, int pos) { + list.set(pos, instance.maps[colPos]); + } + + protected void load(StructInstance instance, int colPos, int pos) { + instance.maps[colPos] = list.get(pos); + } + + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.maps[colPos]); + } + + protected void load(StructInstance instance, int colPos, Object val) { + instance.maps[colPos] = (ImmutableMap) val; + } + + } +}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/memory/ClassStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/memory/ClassStore.java b/repository/src/main/java/org/apache/atlas/repository/memory/ClassStore.java new file mode 100755 index 0000000..a4933ff --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/memory/ClassStore.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.memory; + +import com.google.common.collect.ImmutableList; +import org.apache.atlas.MetadataException; +import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.persistence.ReferenceableInstance; +import org.apache.atlas.typesystem.types.ClassType; + +import java.util.ArrayList; + +public class ClassStore extends HierarchicalTypeStore { + + final ArrayList<ImmutableList<String>> traitNamesStore; + final ClassType classType; + + public ClassStore(MemRepository repository, ClassType hierarchicalType) + throws RepositoryException { + super(repository, hierarchicalType); + classType = hierarchicalType; + traitNamesStore = new ArrayList<ImmutableList<String>>(); + } + + void store(ReferenceableInstance i) throws RepositoryException { + super.store(i); + int pos = idPosMap.get(i.getId()); + traitNamesStore.set(pos, i.getTraits()); + } + + public void ensureCapacity(int pos) throws RepositoryException { + super.ensureCapacity(pos); + while (traitNamesStore.size() < pos + 1) { + traitNamesStore.add(null); + } + } + + boolean validate(MemRepository repo, Id id) throws RepositoryException { + if (id.isUnassigned()) { + throw new RepositoryException(String.format("Invalid Id (unassigned) : %s", id)); + } + Integer pos = idPosMap.get(id); + if (pos == null) { + throw new RepositoryException(String.format("Invalid Id (unknown) : %s", id)); + } + + String typeName = typeNameList.get(pos); + if (typeName != hierarchicalType.getName()) { + throw new RepositoryException( + String.format("Invalid Id (incorrect typeName, type is %s) : %s", + typeName, id)); + } + + return true; + } + + /* + * - assumes id is already validated + */ + ReferenceableInstance createInstance(MemRepository repo, Id id) throws RepositoryException { + Integer pos = idPosMap.get(id); + String typeName = typeNameList.get(pos); + if (typeName != hierarchicalType.getName()) { + return repo.getClassStore(typeName).createInstance(repo, id); + } + + ImmutableList<String> traitNames = traitNamesStore.get(pos); + String[] tNs = traitNames.toArray(new String[]{}); + + try { + ReferenceableInstance r = (ReferenceableInstance) classType.createInstance(id, tNs); + return r; + } catch (MetadataException me) { + throw new RepositoryException(me); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/memory/HierarchicalTypeStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/memory/HierarchicalTypeStore.java b/repository/src/main/java/org/apache/atlas/repository/memory/HierarchicalTypeStore.java new file mode 100755 index 0000000..c9e245b --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/memory/HierarchicalTypeStore.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.memory; + +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.persistence.ReferenceableInstance; +import org.apache.atlas.typesystem.persistence.StructInstance; +import org.apache.atlas.typesystem.types.AttributeInfo; +import org.apache.atlas.typesystem.types.HierarchicalType; +import org.apache.atlas.typesystem.types.IConstructableType; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public abstract class HierarchicalTypeStore { + + final MemRepository repository; + final IConstructableType hierarchicalType; + final ArrayList<String> typeNameList; + final ImmutableMap<AttributeInfo, IAttributeStore> attrStores; + final ImmutableList<HierarchicalTypeStore> superTypeStores; + + + /** + * Map Id to position in storage lists. + */ + Map<Id, Integer> idPosMap; + + List<Integer> freePositions; + + int nextPos; + + /** + * Lock for each Class/Trait. + */ + ReentrantReadWriteLock lock; + + HierarchicalTypeStore(MemRepository repository, HierarchicalType hierarchicalType) + throws RepositoryException { + this.hierarchicalType = (IConstructableType) hierarchicalType; + this.repository = repository; + ImmutableMap.Builder<AttributeInfo, IAttributeStore> b + = new ImmutableBiMap.Builder<AttributeInfo, + IAttributeStore>(); + typeNameList = Lists.newArrayList((String) null); + ImmutableList<AttributeInfo> l = hierarchicalType.immediateAttrs; + for (AttributeInfo i : l) { + b.put(i, AttributeStores.createStore(i)); + } + attrStores = b.build(); + + ImmutableList.Builder<HierarchicalTypeStore> b1 + = new ImmutableList.Builder<HierarchicalTypeStore>(); + Set<String> allSuperTypeNames = hierarchicalType.getAllSuperTypeNames(); + for (String s : allSuperTypeNames) { + b1.add(repository.getStore(s)); + } + superTypeStores = b1.build(); + + nextPos = 0; + idPosMap = new HashMap<Id, Integer>(); + freePositions = new ArrayList<Integer>(); + + lock = new ReentrantReadWriteLock(); + } + + /** + * Assign a storage position to an Id. + * - try to assign from freePositions + * - ensure storage capacity. + * - add entry in idPosMap. + * @param id + * @return + * @throws RepositoryException + */ + int assignPosition(Id id) throws RepositoryException { + + int pos = -1; + if (!freePositions.isEmpty()) { + pos = freePositions.remove(0); + } else { + pos = nextPos++; + ensureCapacity(pos); + } + + idPosMap.put(id, pos); + + for (HierarchicalTypeStore s : superTypeStores) { + s.assignPosition(id); + } + + return pos; + } + + /** + * - remove from idPosMap + * - add to freePositions. + * @throws RepositoryException + */ + void releaseId(Id id) { + + Integer pos = idPosMap.get(id); + if (pos != null) { + idPosMap.remove(id); + freePositions.add(pos); + + for (HierarchicalTypeStore s : superTypeStores) { + s.releaseId(id); + } + } + } + + void acquireReadLock() { + lock.readLock().lock(); + } + + void acquireWriteLock() { + lock.writeLock().lock(); + } + + void releaseReadLock() { + lock.readLock().unlock(); + } + + void releaseWriteLock() { + lock.writeLock().unlock(); + } + + protected void storeFields(int pos, StructInstance s) throws RepositoryException { + for (Map.Entry<AttributeInfo, IAttributeStore> e : attrStores.entrySet()) { + IAttributeStore attributeStore = e.getValue(); + attributeStore.store(pos, hierarchicalType, s); + } + } + + protected void loadFields(int pos, StructInstance s) throws RepositoryException { + for (Map.Entry<AttributeInfo, IAttributeStore> e : attrStores.entrySet()) { + IAttributeStore attributeStore = e.getValue(); + attributeStore.load(pos, hierarchicalType, s); + } + } + + /** + * - store the typeName + * - store the immediate attributes in the respective IAttributeStore + * - call store on each SuperType. + * @param i + * @throws RepositoryException + */ + void store(ReferenceableInstance i) throws RepositoryException { + int pos = idPosMap.get(i.getId()); + typeNameList.set(pos, i.getTypeName()); + storeFields(pos, i); + + for (HierarchicalTypeStore s : superTypeStores) { + s.store(i); + } + } + + /** + * - copy over the immediate attribute values from the respective IAttributeStore + * - call load on each SuperType. + * @param i + * @throws RepositoryException + */ + void load(ReferenceableInstance i) throws RepositoryException { + int pos = idPosMap.get(i.getId()); + loadFields(pos, i); + + for (HierarchicalTypeStore s : superTypeStores) { + s.load(i); + } + } + + public void ensureCapacity(int pos) throws RepositoryException { + while (typeNameList.size() < pos + 1) { + typeNameList.add(null); + } + for (Map.Entry<AttributeInfo, IAttributeStore> e : attrStores.entrySet()) { + IAttributeStore attributeStore = e.getValue(); + attributeStore.ensureCapacity(pos); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/memory/IAttributeStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/memory/IAttributeStore.java b/repository/src/main/java/org/apache/atlas/repository/memory/IAttributeStore.java new file mode 100755 index 0000000..cce239b --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/memory/IAttributeStore.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.memory; + +import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.typesystem.persistence.StructInstance; +import org.apache.atlas.typesystem.types.IConstructableType; + +public interface IAttributeStore { + /** + * Store the attribute's value from the 'instance' into this store. + * @param pos + * @param instance + * @throws RepositoryException + */ + void store(int pos, IConstructableType type, StructInstance instance) + throws RepositoryException; + + /** + * load the Instance with the value from position 'pos' for the attribute. + * @param pos + * @param instance + * @throws RepositoryException + */ + void load(int pos, IConstructableType type, StructInstance instance) throws RepositoryException; + + /** + * Ensure store have space for the given pos. + * @param pos + * @throws RepositoryException + */ + void ensureCapacity(int pos) throws RepositoryException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/memory/MemRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/memory/MemRepository.java b/repository/src/main/java/org/apache/atlas/repository/memory/MemRepository.java new file mode 100755 index 0000000..0383377 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/memory/MemRepository.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.memory; + +import org.apache.atlas.MetadataException; +import org.apache.atlas.repository.DiscoverInstances; +import org.apache.atlas.repository.IRepository; +import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.persistence.MapIds; +import org.apache.atlas.typesystem.persistence.ReferenceableInstance; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.HierarchicalType; +import org.apache.atlas.typesystem.types.Multiplicity; +import org.apache.atlas.typesystem.types.ObjectGraphWalker; +import org.apache.atlas.typesystem.types.TraitType; +import org.apache.atlas.typesystem.types.TypeSystem; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; + +public class MemRepository implements IRepository { + +/* + public static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + public static SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyy-MM-dd"); +*/ + + final TypeSystem typeSystem; + /* + * A Store for each Class and Trait. + */ + final Map<String, HierarchicalTypeStore> typeStores; + final AtomicInteger ID_SEQ = new AtomicInteger(0); + + public MemRepository(TypeSystem typeSystem) { + this.typeSystem = typeSystem; + this.typeStores = new HashMap<>(); + } + +/* + @Override + public DateFormat getDateFormat() { + return dateFormat; + } + + @Override + public DateFormat getTimestampFormat() { + return timestampFormat; + } + + @Override + public boolean allowNullsInCollections() { + return false; + } +*/ + + @Override + public Id newId(String typeName) { + return new Id("" + ID_SEQ.incrementAndGet(), 0, typeName); + } + + /** + * 1. traverse the Object Graph from i and create idToNewIdMap : Map[Id, Id], + * also create old Id to Instance Map: oldIdToInstance : Map[Id, IInstance] + * - traverse reference Attributes, List[ClassType], Maps where Key/value is ClassType + * - traverse Structs + * - traverse Traits. + * 1b. Ensure that every newId has an associated Instance. + * 2. Traverse oldIdToInstance map create newInstances : List[ITypedReferenceableInstance] + * - create a ITypedReferenceableInstance. + * replace any old References ( ids or object references) with new Ids. + * 3. Traverse over newInstances + * - ask ClassStore to assign a position to the Id. + * - for Instances with Traits, assign a position for each Trait + * - invoke store on the nwInstance. + * + * Recovery: + * - on each newInstance, invoke releaseId and delete on its ClassStore and Traits' Stores. + * + * @param i + * @return + * @throws org.apache.atlas.repository.RepositoryException + */ + public ITypedReferenceableInstance create(IReferenceableInstance i) throws RepositoryException { + + DiscoverInstances discoverInstances = new DiscoverInstances(this); + + /* + * Step 1: traverse the Object Graph from i and create idToNewIdMap : Map[Id, Id], + * also create old Id to Instance Map: oldIdToInstance : Map[Id, IInstance] + * - traverse reference Attributes, List[ClassType], Maps where Key/value is ClassType + * - traverse Structs + * - traverse Traits. + */ + try { + new ObjectGraphWalker(typeSystem, discoverInstances, i).walk(); + } catch (MetadataException me) { + throw new RepositoryException("TypeSystem error when walking the ObjectGraph", me); + } + + /* + * Step 1b: Ensure that every newId has an associated Instance. + */ + for (Id oldId : discoverInstances.idToNewIdMap.keySet()) { + if (!discoverInstances.idToInstanceMap.containsKey(oldId)) { + throw new RepositoryException(String.format("Invalid Object Graph: " + + "Encountered an unassignedId %s that is not associated with an Instance", + oldId)); + } + } + + /* Step 2: Traverse oldIdToInstance map create newInstances : + List[ITypedReferenceableInstance] + * - create a ITypedReferenceableInstance. + * replace any old References ( ids or object references) with new Ids. + */ + List<ITypedReferenceableInstance> newInstances + = new ArrayList<ITypedReferenceableInstance>(); + ITypedReferenceableInstance retInstance = null; + Set<ClassType> classTypes = new TreeSet<ClassType>(); + Set<TraitType> traitTypes = new TreeSet<TraitType>(); + for (IReferenceableInstance transientInstance : discoverInstances.idToInstanceMap + .values()) { + try { + ClassType cT = typeSystem + .getDataType(ClassType.class, transientInstance.getTypeName()); + ITypedReferenceableInstance newInstance = cT + .convert(transientInstance, Multiplicity.REQUIRED); + newInstances.add(newInstance); + + classTypes.add(cT); + for (String traitName : newInstance.getTraits()) { + TraitType tT = typeSystem.getDataType(TraitType.class, traitName); + traitTypes.add(tT); + } + + if (newInstance.getId() == i.getId()) { + retInstance = newInstance; + } + + /* + * Now replace old references with new Ids + */ + MapIds mapIds = new MapIds(discoverInstances.idToNewIdMap); + new ObjectGraphWalker(typeSystem, mapIds, newInstances).walk(); + + } catch (MetadataException me) { + throw new RepositoryException( + String.format("Failed to create Instance(id = %s", + transientInstance.getId()), me); + } + } + + /* + * 3. Acquire Class and Trait Storage locks. + * - acquire them in a stable order (super before subclass, classes before traits + */ + for (ClassType cT : classTypes) { + HierarchicalTypeStore st = typeStores.get(cT.getName()); + st.acquireWriteLock(); + } + + for (TraitType tT : traitTypes) { + HierarchicalTypeStore st = typeStores.get(tT.getName()); + st.acquireWriteLock(); + } + + + /* + * 4. Traverse over newInstances + * - ask ClassStore to assign a position to the Id. + * - for Instances with Traits, assign a position for each Trait + * - invoke store on the nwInstance. + */ + try { + for (ITypedReferenceableInstance instance : newInstances) { + HierarchicalTypeStore st = typeStores.get(instance.getTypeName()); + st.assignPosition(instance.getId()); + for (String traitName : instance.getTraits()) { + HierarchicalTypeStore tt = typeStores.get(traitName); + tt.assignPosition(instance.getId()); + } + } + + for (ITypedReferenceableInstance instance : newInstances) { + HierarchicalTypeStore st = typeStores.get(instance.getTypeName()); + st.store((ReferenceableInstance) instance); + for (String traitName : instance.getTraits()) { + HierarchicalTypeStore tt = typeStores.get(traitName); + tt.store((ReferenceableInstance) instance); + } + } + } catch (RepositoryException re) { + for (ITypedReferenceableInstance instance : newInstances) { + HierarchicalTypeStore st = typeStores.get(instance.getTypeName()); + st.releaseId(instance.getId()); + } + throw re; + } finally { + for (ClassType cT : classTypes) { + HierarchicalTypeStore st = typeStores.get(cT.getName()); + st.releaseWriteLock(); + } + + for (TraitType tT : traitTypes) { + HierarchicalTypeStore st = typeStores.get(tT.getName()); + st.releaseWriteLock(); + } + } + + return retInstance; + } + + public ITypedReferenceableInstance update(ITypedReferenceableInstance i) + throws RepositoryException { + throw new RepositoryException("not implemented"); + } + + public void delete(ITypedReferenceableInstance i) throws RepositoryException { + throw new RepositoryException("not implemented"); + } + + public ITypedReferenceableInstance get(Id id) throws RepositoryException { + + try { + ReplaceIdWithInstance replacer = new ReplaceIdWithInstance(this); + ObjectGraphWalker walker = new ObjectGraphWalker(typeSystem, replacer); + replacer.setWalker(walker); + ITypedReferenceableInstance r = getDuringWalk(id, walker); + walker.walk(); + return r; + } catch (MetadataException me) { + throw new RepositoryException("TypeSystem error when walking the ObjectGraph", me); + } + } + + /* + * - Id must be valid; Class must be valid. + * - Ask ClassStore to createInstance. + * - Ask ClassStore to load instance. + * - load instance traits + * - add to GraphWalker + */ + ITypedReferenceableInstance getDuringWalk(Id id, ObjectGraphWalker walker) + throws RepositoryException { + ClassStore cS = getClassStore(id.getTypeName()); + if (cS == null) { + throw new RepositoryException(String.format("Unknown Class %s", id.getTypeName())); + } + cS.validate(this, id); + ReferenceableInstance r = cS.createInstance(this, id); + cS.load(r); + for (String traitName : r.getTraits()) { + HierarchicalTypeStore tt = typeStores.get(traitName); + tt.load(r); + } + + walker.addRoot(r); + return r; + } + + HierarchicalTypeStore getStore(String typeName) { + return typeStores.get(typeName); + } + + ClassStore getClassStore(String typeName) { + return (ClassStore) getStore(typeName); + } + + public void defineClass(ClassType type) throws RepositoryException { + HierarchicalTypeStore s = new ClassStore(this, type); + typeStores.put(type.getName(), s); + } + + public void defineTrait(TraitType type) throws RepositoryException { + HierarchicalTypeStore s = new TraitStore(this, type); + typeStores.put(type.getName(), s); + } + + public void defineTypes(List<HierarchicalType> types) throws RepositoryException { + List<TraitType> tTypes = new ArrayList<TraitType>(); + List<ClassType> cTypes = new ArrayList<ClassType>(); + + for (HierarchicalType h : types) { + if (h.getTypeCategory() == DataTypes.TypeCategory.TRAIT) { + tTypes.add((TraitType) h); + } else { + cTypes.add((ClassType) h); + } + } + + Collections.sort(tTypes); + Collections.sort(cTypes); + + for (TraitType tT : tTypes) { + defineTrait(tT); + } + + for (ClassType cT : cTypes) { + defineClass(cT); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/memory/ReplaceIdWithInstance.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/memory/ReplaceIdWithInstance.java b/repository/src/main/java/org/apache/atlas/repository/memory/ReplaceIdWithInstance.java new file mode 100755 index 0000000..3fa97e5 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/memory/ReplaceIdWithInstance.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.memory; + +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.atlas.MetadataException; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.Multiplicity; +import org.apache.atlas.typesystem.types.ObjectGraphWalker; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +public class ReplaceIdWithInstance implements ObjectGraphWalker.NodeProcessor { + + public final Map<Id, ITypedReferenceableInstance> idToInstanceMap; + final MemRepository repository; + ObjectGraphWalker walker; + + public ReplaceIdWithInstance(MemRepository repository) { + this.repository = repository; + idToInstanceMap = new HashMap<>(); + } + + void setWalker(ObjectGraphWalker walker) { + this.walker = walker; + } + + @Override + public void processNode(ObjectGraphWalker.Node nd) throws MetadataException { + if (nd.attributeName == null) { + // do nothing + } else if (!nd.aInfo.isComposite || nd.value == null) { + // do nothing + } else if (nd.aInfo.dataType().getTypeCategory() == DataTypes.TypeCategory.CLASS) { + + if (nd.value != null && nd.value instanceof Id) { + Id id = (Id) nd.value; + ITypedReferenceableInstance r = getInstance(id); + nd.instance.set(nd.attributeName, r); + } + } else if (nd.aInfo.dataType().getTypeCategory() == DataTypes.TypeCategory.ARRAY) { + DataTypes.ArrayType aT = (DataTypes.ArrayType) nd.aInfo.dataType(); + nd.instance.set(nd.attributeName, + convertToInstances((ImmutableCollection) nd.value, nd.aInfo.multiplicity, aT)); + } else if (nd.aInfo.dataType().getTypeCategory() == DataTypes.TypeCategory.MAP) { + DataTypes.MapType mT = (DataTypes.MapType) nd.aInfo.dataType(); + nd.instance.set(nd.attributeName, + convertToInstances((ImmutableMap) nd.value, nd.aInfo.multiplicity, mT)); + } + } + + ImmutableCollection<?> convertToInstances(ImmutableCollection<?> val, + Multiplicity m, DataTypes.ArrayType arrType) + throws MetadataException { + + if (val == null || + arrType.getElemType().getTypeCategory() != DataTypes.TypeCategory.CLASS) { + return val; + } + + ImmutableCollection.Builder b = m.isUnique ? ImmutableSet.builder() + : ImmutableList.builder(); + Iterator it = val.iterator(); + while (it.hasNext()) { + Object elem = it.next(); + if (elem instanceof Id) { + Id id = (Id) elem; + elem = getInstance(id); + } + + b.add(elem); + + } + return b.build(); + } + + ImmutableMap<?, ?> convertToInstances(ImmutableMap val, Multiplicity m, + DataTypes.MapType mapType) + throws MetadataException { + + if (val == null || + (mapType.getKeyType().getTypeCategory() != DataTypes.TypeCategory.CLASS && + mapType.getValueType().getTypeCategory() != DataTypes.TypeCategory.CLASS)) { + return val; + } + ImmutableMap.Builder b = ImmutableMap.builder(); + Iterator<Map.Entry> it = val.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry elem = it.next(); + Object oldKey = elem.getKey(); + Object oldValue = elem.getValue(); + Object newKey = oldKey; + Object newValue = oldValue; + + if (oldKey instanceof Id) { + Id id = (Id) elem; + ITypedReferenceableInstance r = getInstance(id); + } + + if (oldValue instanceof Id) { + Id id = (Id) elem; + ITypedReferenceableInstance r = getInstance(id); + } + + b.put(newKey, newValue); + } + return b.build(); + } + + ITypedReferenceableInstance getInstance(Id id) throws MetadataException { + + ITypedReferenceableInstance r = idToInstanceMap.get(id); + if (r == null) { + r = repository.get(id); + idToInstanceMap.put(id, r); + walker.addRoot(r); + } + return r; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/memory/StructStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/memory/StructStore.java b/repository/src/main/java/org/apache/atlas/repository/memory/StructStore.java new file mode 100755 index 0000000..fb84e13 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/memory/StructStore.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.memory; + +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.typesystem.persistence.StructInstance; +import org.apache.atlas.typesystem.types.AttributeInfo; +import org.apache.atlas.typesystem.types.StructType; + +import java.util.Collection; +import java.util.Map; + +public class StructStore extends AttributeStores.AbstractAttributeStore implements IAttributeStore { + + final StructType structType; + final ImmutableMap<AttributeInfo, IAttributeStore> attrStores; + + StructStore(AttributeInfo aInfo) throws RepositoryException { + super(aInfo); + this.structType = (StructType) aInfo.dataType(); + ImmutableMap.Builder<AttributeInfo, IAttributeStore> b = new ImmutableBiMap.Builder<>(); + Collection<AttributeInfo> l = structType.fieldMapping.fields.values(); + for (AttributeInfo i : l) { + b.put(i, AttributeStores.createStore(i)); + } + attrStores = b.build(); + + } + + @Override + protected void store(StructInstance instance, int colPos, int pos) throws RepositoryException { + StructInstance s = instance.structs[colPos]; + for (Map.Entry<AttributeInfo, IAttributeStore> e : attrStores.entrySet()) { + IAttributeStore attributeStore = e.getValue(); + attributeStore.store(pos, structType, s); + } + } + + @Override + protected void load(StructInstance instance, int colPos, int pos) throws RepositoryException { + StructInstance s = (StructInstance) structType.createInstance(); + instance.structs[colPos] = s; + for (Map.Entry<AttributeInfo, IAttributeStore> e : attrStores.entrySet()) { + IAttributeStore attributeStore = e.getValue(); + attributeStore.load(pos, structType, s); + } + } + + @Override + protected void store(StructInstance instance, int colPos, String attrName, + Map<String, Object> m) { + m.put(attrName, instance.structs[colPos]); + } + + @Override + protected void load(StructInstance instance, int colPos, Object val) { + instance.structs[colPos] = (StructInstance) val; + } + + @Override + public void ensureCapacity(int pos) throws RepositoryException { + for (Map.Entry<AttributeInfo, IAttributeStore> e : attrStores.entrySet()) { + IAttributeStore attributeStore = e.getValue(); + attributeStore.ensureCapacity(pos); + } + nullList.size(pos + 1); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/memory/TraitStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/memory/TraitStore.java b/repository/src/main/java/org/apache/atlas/repository/memory/TraitStore.java new file mode 100755 index 0000000..cd9c8ee --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/memory/TraitStore.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.memory; + +import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.typesystem.persistence.ReferenceableInstance; +import org.apache.atlas.typesystem.persistence.StructInstance; +import org.apache.atlas.typesystem.types.TraitType; + +import java.util.ArrayList; + +public class TraitStore extends HierarchicalTypeStore { + + final ArrayList<String> classNameStore; + + public TraitStore(MemRepository repository, TraitType hierarchicalType) + throws RepositoryException { + super(repository, hierarchicalType); + classNameStore = new ArrayList<>(); + } + + void store(ReferenceableInstance i) throws RepositoryException { + int pos = idPosMap.get(i.getId()); + StructInstance s = (StructInstance) i.getTrait(hierarchicalType.getName()); + super.storeFields(pos, s); + classNameStore.set(pos, i.getTypeName()); + } + + void load(ReferenceableInstance i) throws RepositoryException { + int pos = idPosMap.get(i.getId()); + StructInstance s = (StructInstance) i.getTrait(hierarchicalType.getName()); + super.loadFields(pos, s); + } + + public void ensureCapacity(int pos) throws RepositoryException { + super.ensureCapacity(pos); + while (classNameStore.size() < pos + 1) { + classNameStore.add(null); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java b/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java new file mode 100755 index 0000000..0c46b40 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.typestore; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.thinkaurelius.titan.core.TitanGraph; +import com.tinkerpop.blueprints.Direction; +import com.tinkerpop.blueprints.Edge; +import com.tinkerpop.blueprints.Vertex; +import org.apache.atlas.GraphTransaction; +import org.apache.atlas.MetadataException; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graph.GraphProvider; +import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.types.AttributeDefinition; +import org.apache.atlas.typesystem.types.AttributeInfo; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.EnumType; +import org.apache.atlas.typesystem.types.EnumTypeDefinition; +import org.apache.atlas.typesystem.types.EnumValue; +import org.apache.atlas.typesystem.types.HierarchicalType; +import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; +import org.apache.atlas.typesystem.types.IDataType; +import org.apache.atlas.typesystem.types.StructType; +import org.apache.atlas.typesystem.types.StructTypeDefinition; +import org.apache.atlas.typesystem.types.TraitType; +import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.atlas.typesystem.types.TypeUtils; +import org.codehaus.jettison.json.JSONException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +@Singleton +public class GraphBackedTypeStore implements ITypeStore { + public static final String VERTEX_TYPE = "typeSystem"; + private static final String PROPERTY_PREFIX = Constants.INTERNAL_PROPERTY_KEY_PREFIX + "type."; + public static final String SUPERTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".supertype"; + + private static Logger LOG = LoggerFactory.getLogger(GraphBackedTypeStore.class); + + private final TitanGraph titanGraph; + + @Inject + public GraphBackedTypeStore(GraphProvider<TitanGraph> graphProvider) { + titanGraph = graphProvider.get(); + } + + @Override + public void store(TypeSystem typeSystem) throws MetadataException { + store(typeSystem, ImmutableList.copyOf(typeSystem.getTypeNames())); + } + + @Override + public void store(TypeSystem typeSystem, ImmutableList<String> typeNames) throws MetadataException { + ImmutableList<String> coreTypes = typeSystem.getCoreTypes(); + for (String typeName : typeNames) { + if (!coreTypes.contains(typeName)) { + IDataType dataType = typeSystem.getDataType(IDataType.class, typeName); + LOG.debug("Processing {}.{} in type store", dataType.getTypeCategory(), dataType.getName()); + switch (dataType.getTypeCategory()) { + case ENUM: + storeInGraph((EnumType)dataType); + break; + + case STRUCT: + StructType structType = (StructType) dataType; + storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(), + ImmutableList.copyOf(structType.infoToNameMap.keySet()), ImmutableList.<String>of()); + break; + + case TRAIT: + case CLASS: + HierarchicalType type = (HierarchicalType) dataType; + storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(), + type.immediateAttrs, type.superTypes); + break; + + default: //Ignore primitive/collection types as they are covered under references + break; + } + } + } + } + + private void addProperty(Vertex vertex, String propertyName, Object value) { + LOG.debug("Setting property {} = \"{}\" to vertex {}", propertyName, value, vertex); + vertex.setProperty(propertyName, value); + } + + private void storeInGraph(EnumType dataType) { + Vertex vertex = createVertex(dataType.getTypeCategory(), dataType.getName()); + List<String> values = new ArrayList<>(dataType.values().size()); + for (EnumValue enumValue : dataType.values()) { + String key = getPropertyKey(dataType.getName(), enumValue.value); + addProperty(vertex, key, enumValue.ordinal); + values.add(enumValue.value); + } + addProperty(vertex, getPropertyKey(dataType.getName()), values); + } + + private String getPropertyKey(String name) { + return PROPERTY_PREFIX + name; + } + + private String getPropertyKey(String parent, String child) { + return PROPERTY_PREFIX + parent + "." + child; + } + + private String getEdgeLabel(String parent, String child) { + return PROPERTY_PREFIX + "edge." + parent + "." + child; + } + + private void storeInGraph(TypeSystem typeSystem, DataTypes.TypeCategory category, String typeName, + ImmutableList<AttributeInfo> attributes, ImmutableList<String> superTypes) throws MetadataException { + Vertex vertex = createVertex(category, typeName); + List<String> attrNames = new ArrayList<>(); + if (attributes != null) { + for (AttributeInfo attribute : attributes) { + String propertyKey = getPropertyKey(typeName, attribute.name); + try { + addProperty(vertex, propertyKey, attribute.toJson()); + } catch (JSONException e) { + throw new StorageException(typeName, e); + } + attrNames.add(attribute.name); + addReferencesForAttribute(typeSystem, vertex, attribute); + } + } + addProperty(vertex, getPropertyKey(typeName), attrNames); + + //Add edges for hierarchy + if (superTypes != null) { + for (String superTypeName : superTypes) { + HierarchicalType superType = typeSystem.getDataType(HierarchicalType.class, superTypeName); + Vertex superVertex = createVertex(superType.getTypeCategory(), superTypeName); + addEdge(vertex, superVertex, SUPERTYPE_EDGE_LABEL); + } + } + } + + //Add edges for complex attributes + private void addReferencesForAttribute(TypeSystem typeSystem, Vertex vertex, AttributeInfo attribute) throws MetadataException { + ImmutableList<String> coreTypes = typeSystem.getCoreTypes(); + List<IDataType> attrDataTypes = new ArrayList<>(); + IDataType attrDataType = attribute.dataType(); + String vertexTypeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY); + + switch (attrDataType.getTypeCategory()) { + case ARRAY: + String attrType = TypeUtils.parseAsArrayType(attrDataType.getName()); + IDataType elementType = typeSystem.getDataType(IDataType.class, attrType); + attrDataTypes.add(elementType); + break; + + case MAP: + String[] attrTypes = TypeUtils.parseAsMapType(attrDataType.getName()); + IDataType keyType = typeSystem.getDataType(IDataType.class, attrTypes[0]); + IDataType valueType = typeSystem.getDataType(IDataType.class, attrTypes[1]); + attrDataTypes.add(keyType); + attrDataTypes.add(valueType); + break; + + case ENUM: + case STRUCT: + case CLASS: + attrDataTypes.add(attrDataType); + break; + + case PRIMITIVE: //no vertex for primitive type, hence no edge required + break; + + default: + throw new IllegalArgumentException("Attribute cannot reference instances of type : " + attrDataType.getTypeCategory()); + } + + for (IDataType attrType : attrDataTypes) { + if (!coreTypes.contains(attrType.getName())) { + Vertex attrVertex = createVertex(attrType.getTypeCategory(), attrType.getName()); + String label = getEdgeLabel(vertexTypeName, attribute.name); + addEdge(vertex, attrVertex, label); + } + } + } + + private void addEdge(Vertex fromVertex, Vertex toVertex, String label) { + LOG.debug("Adding edge from {} to {} with label {}" + toString(fromVertex), toString(toVertex), label); + titanGraph.addEdge(null, fromVertex, toVertex, label); + } + + @Override + @GraphTransaction + public TypesDef restore() throws MetadataException { + //Get all vertices for type system + Iterator vertices = + titanGraph.query().has(Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE).vertices().iterator(); + + ImmutableList.Builder<EnumTypeDefinition> enums = ImmutableList.builder(); + ImmutableList.Builder<StructTypeDefinition> structs = ImmutableList.builder(); + ImmutableList.Builder<HierarchicalTypeDefinition<ClassType>> classTypes = ImmutableList.builder(); + ImmutableList.Builder<HierarchicalTypeDefinition<TraitType>> traits = ImmutableList.builder(); + + while (vertices.hasNext()) { + Vertex vertex = (Vertex) vertices.next(); + DataTypes.TypeCategory typeCategory = vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY); + String typeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY); + LOG.info("Restoring type {}.{}", typeCategory, typeName); + switch (typeCategory) { + case ENUM: + enums.add(getEnumType(vertex)); + break; + + case STRUCT: + AttributeDefinition[] attributes = getAttributes(vertex, typeName); + structs.add(new StructTypeDefinition(typeName, attributes)); + break; + + case CLASS: + ImmutableList<String> superTypes = getSuperTypes(vertex); + attributes = getAttributes(vertex, typeName); + classTypes.add(new HierarchicalTypeDefinition(ClassType.class, typeName, superTypes, attributes)); + break; + + case TRAIT: + superTypes = getSuperTypes(vertex); + attributes = getAttributes(vertex, typeName); + traits.add(new HierarchicalTypeDefinition(TraitType.class, typeName, superTypes, attributes)); + break; + + default: + throw new IllegalArgumentException("Unhandled type category " + typeCategory); + } + } + return TypeUtils.getTypesDef(enums.build(), structs.build(), traits.build(), classTypes.build()); + } + + private EnumTypeDefinition getEnumType(Vertex vertex) { + String typeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY); + List<EnumValue> enumValues = new ArrayList<>(); + List<String> values = vertex.getProperty(getPropertyKey(typeName)); + for (String value : values) { + String valueProperty = getPropertyKey(typeName, value); + enumValues.add(new EnumValue(value, vertex.<Integer>getProperty(valueProperty))); + } + return new EnumTypeDefinition(typeName, enumValues.toArray(new EnumValue[enumValues.size()])); + } + + private ImmutableList<String> getSuperTypes(Vertex vertex) { + List<String> superTypes = new ArrayList<>(); + Iterator<Edge> edges = vertex.getEdges(Direction.OUT, SUPERTYPE_EDGE_LABEL).iterator(); + while (edges.hasNext()) { + Edge edge = edges.next(); + superTypes.add((String) edge.getVertex(Direction.IN).getProperty(Constants.TYPENAME_PROPERTY_KEY)); + } + return ImmutableList.copyOf(superTypes); + } + + private AttributeDefinition[] getAttributes(Vertex vertex, String typeName) throws MetadataException { + List<AttributeDefinition> attributes = new ArrayList<>(); + List<String> attrNames = vertex.getProperty(getPropertyKey(typeName)); + if (attrNames != null) { + for (String attrName : attrNames) { + try { + String propertyKey = getPropertyKey(typeName, attrName); + attributes.add(AttributeInfo.fromJson((String) vertex.getProperty(propertyKey))); + } catch (JSONException e) { + throw new MetadataException(e); + } + } + } + return attributes.toArray(new AttributeDefinition[attributes.size()]); + } + + private String toString(Vertex vertex) { + return PROPERTY_PREFIX + "." + vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY); + } + + /** + * Find vertex for the given type category and name, else create new vertex + * @param category + * @param typeName + * @return vertex + */ + private Vertex findVertex(DataTypes.TypeCategory category, String typeName) { + LOG.debug("Finding vertex for {}.{}", category, typeName); + + Iterator results = titanGraph.query().has(Constants.TYPENAME_PROPERTY_KEY, typeName).vertices().iterator(); + Vertex vertex = null; + if (results != null && results.hasNext()) { + //There should be just one vertex with the given typeName + vertex = (Vertex) results.next(); + } + return vertex; + } + + private Vertex createVertex(DataTypes.TypeCategory category, String typeName) { + Vertex vertex = findVertex(category, typeName); + if (vertex == null) { + LOG.debug("Adding vertex {}{}", PROPERTY_PREFIX, typeName); + vertex = titanGraph.addVertex(null); + addProperty(vertex, Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE); //Mark as type vertex + addProperty(vertex, Constants.TYPE_CATEGORY_PROPERTY_KEY, category); + addProperty(vertex, Constants.TYPENAME_PROPERTY_KEY, typeName); + } + return vertex; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/typestore/ITypeStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/typestore/ITypeStore.java b/repository/src/main/java/org/apache/atlas/repository/typestore/ITypeStore.java new file mode 100755 index 0000000..39f0160 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/typestore/ITypeStore.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.typestore; + +import com.google.common.collect.ImmutableList; +import org.apache.atlas.MetadataException; +import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.types.TypeSystem; + +public interface ITypeStore { + /** + * Persist the entire type system - insert or update + * @param typeSystem type system to persist + * @throws StorageException + */ + void store(TypeSystem typeSystem) throws MetadataException; + + /** + * Persist the given type in the type system - insert or update + * @param typeSystem type system + * @param types types to persist + * @throws StorageException + */ + void store(TypeSystem typeSystem, ImmutableList<String> types) throws MetadataException; + + /** + * Restore all type definitions + * @return List of persisted type definitions + * @throws org.apache.atlas.MetadataException + */ + TypesDef restore() throws MetadataException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/atlas/repository/typestore/StorageException.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/typestore/StorageException.java b/repository/src/main/java/org/apache/atlas/repository/typestore/StorageException.java new file mode 100755 index 0000000..7b9c45c --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/typestore/StorageException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.typestore; + +import org.apache.atlas.MetadataException; + +public class StorageException extends MetadataException { + public StorageException(String type) { + super("Failure in typesystem storage for type " + type); + } + + public StorageException(String type, Throwable cause) { + super("Failure in typesystem storage for type " + type, cause); + } + + public StorageException(Throwable cause) { + super("Failure in type system storage", cause); + } +}
