Repository: incubator-gobblin Updated Branches: refs/heads/master 5457af88d -> f96379e11
[GOBBLIN-179] Add shim layer for Gobblin state Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/886c6e40 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/886c6e40 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/886c6e40 Branch: refs/heads/master Commit: 886c6e404976c1c152d5ed4af30c3404eee94f68 Parents: ee39a46 Author: Abhishek Tiwari <[email protected]> Authored: Mon Jul 31 05:46:53 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Mon Jul 31 05:46:53 2017 -0700 ---------------------------------------------------------------------- .../configuration/ImmutableWorkUnitState.java | 26 ++++++++ .../java/gobblin/configuration/SourceState.java | 52 ++++++++++++++++ .../main/java/gobblin/configuration/State.java | 36 +++++++++++ .../gobblin/configuration/WorkUnitState.java | 40 +++++++++++++ .../extractor/CheckpointableWatermark.java | 25 ++++++++ .../gobblin/source/extractor/Watermark.java | 23 +++++++ .../source/extractor/WatermarkInterval.java | 26 ++++++++ .../java/gobblin/source/workunit/Extract.java | 51 ++++++++++++++++ .../source/workunit/ImmutableExtract.java | 41 +++++++++++++ .../source/workunit/ImmutableWorkUnit.java | 26 ++++++++ .../gobblin/source/workunit/MultiWorkUnit.java | 27 +++++++++ .../java/gobblin/source/workunit/WorkUnit.java | 49 +++++++++++++++ .../main/java/gobblin/state/ConstructState.java | 36 +++++++++++ .../conversion/hive/source/HiveWorkUnit.java | 32 ++++++++++ .../main/java/gobblin/hive/HiveRegProps.java | 32 ++++++++++ .../gobblin/metastore/FsStateStoreTest.java | 40 ++++++------- .../runtime/CheckpointableWatermarkState.java | 33 ++++++++++ .../src/main/java/gobblin/runtime/JobState.java | 63 ++++++++++++++++++++ .../main/java/gobblin/runtime/TaskState.java | 5 ++ .../util/JobStateToJsonConverterTest.java | 34 +++++------ 20 files changed, 660 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-api/src/main/java/gobblin/configuration/ImmutableWorkUnitState.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/gobblin/configuration/ImmutableWorkUnitState.java b/gobblin-api/src/main/java/gobblin/configuration/ImmutableWorkUnitState.java new file mode 100644 index 0000000..7660ff2 --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/configuration/ImmutableWorkUnitState.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.configuration; + +/*** + * Shim layer for org.apache.gobblin.configuration.ImmutableWorkUnitState + */ +public class ImmutableWorkUnitState extends org.apache.gobblin.configuration.ImmutableWorkUnitState { + public ImmutableWorkUnitState(WorkUnitState workUnitState) { + super(workUnitState); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-api/src/main/java/gobblin/configuration/SourceState.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/gobblin/configuration/SourceState.java b/gobblin-api/src/main/java/gobblin/configuration/SourceState.java new file mode 100644 index 0000000..9a84fc2 --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/configuration/SourceState.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.configuration; + +import java.util.Map; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; + +/*** + * Shim layer for org.apache.gobblin.configuration.SourceState + */ +public class SourceState extends org.apache.gobblin.configuration.SourceState { + /** + * Default constructor. + */ + public SourceState() { + super(); + } + + public SourceState(State properties, Iterable<WorkUnitState> prevWorkUnitStates) { + super(properties, adaptWorkUnitStates(prevWorkUnitStates)); + } + + public SourceState(State properties, Map<String, ? extends SourceState> previousDatasetStatesByUrns, + Iterable<WorkUnitState> previousWorkUnitStates) { + super(properties, previousDatasetStatesByUrns, adaptWorkUnitStates(previousWorkUnitStates)); + } + + private static Iterable<org.apache.gobblin.configuration.WorkUnitState> adaptWorkUnitStates(Iterable<WorkUnitState> prevWorkUnitStates) { + return Iterables.transform(prevWorkUnitStates, new Function<WorkUnitState, org.apache.gobblin.configuration.WorkUnitState>() { + @Override + public org.apache.gobblin.configuration.WorkUnitState apply(WorkUnitState input) { + return input; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-api/src/main/java/gobblin/configuration/State.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/gobblin/configuration/State.java b/gobblin-api/src/main/java/gobblin/configuration/State.java new file mode 100644 index 0000000..2820ea0 --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/configuration/State.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.configuration; + +import java.util.Properties; + +/*** + * Shim layer for org.apache.gobblin.configuration.State + */ +public class State extends org.apache.gobblin.configuration.State { + public State() { + super(); + } + + public State(Properties properties) { + super(properties); + } + + public State(State otherState) { + super(otherState); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-api/src/main/java/gobblin/configuration/WorkUnitState.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/gobblin/configuration/WorkUnitState.java b/gobblin-api/src/main/java/gobblin/configuration/WorkUnitState.java new file mode 100644 index 0000000..1be342a --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/configuration/WorkUnitState.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.configuration; + +import org.apache.gobblin.source.workunit.WorkUnit; + +/*** + * Shim layer for org.apache.gobblin.configuration.WorkUnitState + */ +public class WorkUnitState extends org.apache.gobblin.configuration.WorkUnitState { + /** + * Default constructor used for deserialization. + */ + public WorkUnitState() { + super(); + } + + @Deprecated + public WorkUnitState(WorkUnit workUnit) { + super(workUnit); + } + + public WorkUnitState(WorkUnit workUnit, State jobState) { + super(workUnit, jobState); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-api/src/main/java/gobblin/source/extractor/CheckpointableWatermark.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/gobblin/source/extractor/CheckpointableWatermark.java b/gobblin-api/src/main/java/gobblin/source/extractor/CheckpointableWatermark.java new file mode 100644 index 0000000..0d46919 --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/source/extractor/CheckpointableWatermark.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package gobblin.source.extractor; + +/*** + * Shim layer for org.apache.gobblin.source.extractor.CheckpointableWatermark + */ +public interface CheckpointableWatermark extends org.apache.gobblin.source.extractor.CheckpointableWatermark { +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-api/src/main/java/gobblin/source/extractor/Watermark.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/gobblin/source/extractor/Watermark.java b/gobblin-api/src/main/java/gobblin/source/extractor/Watermark.java new file mode 100644 index 0000000..1059dda --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/source/extractor/Watermark.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.source.extractor; + +/*** + * Shim layer for org.apache.gobblin.source.extractor.Watermark + */ +public interface Watermark extends org.apache.gobblin.source.extractor.Watermark { +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-api/src/main/java/gobblin/source/extractor/WatermarkInterval.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/gobblin/source/extractor/WatermarkInterval.java b/gobblin-api/src/main/java/gobblin/source/extractor/WatermarkInterval.java new file mode 100644 index 0000000..b219ff1 --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/source/extractor/WatermarkInterval.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.source.extractor; + +/*** + * Shim layer for org.apache.gobblin.source.extractor.WatermarkInterval + */ +public class WatermarkInterval extends org.apache.gobblin.source.extractor.WatermarkInterval { + public WatermarkInterval(Watermark lowWatermark, Watermark expectedHighWatermark) { + super(lowWatermark, expectedHighWatermark); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-api/src/main/java/gobblin/source/workunit/Extract.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/gobblin/source/workunit/Extract.java b/gobblin-api/src/main/java/gobblin/source/workunit/Extract.java new file mode 100644 index 0000000..2eb9775 --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/source/workunit/Extract.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.source.workunit; + +import gobblin.configuration.SourceState; + +/*** + * Shim layer for org.apache.gobblin.source.workunit.Extract + */ +public class Extract extends org.apache.gobblin.source.workunit.Extract { + public enum TableType { + SNAPSHOT_ONLY, + SNAPSHOT_APPEND, + APPEND_ONLY + } + + @Deprecated + public Extract(SourceState state, TableType type, String namespace, String table) { + super(state, adaptTableType(type), namespace, table); + } + + public Extract(TableType type, String namespace, String table) { + super(adaptTableType(type), namespace, table); + } + + public Extract(Extract extract) { + super(extract); + } + + private static org.apache.gobblin.source.workunit.Extract.TableType adaptTableType(TableType type) { + switch (type) { + case SNAPSHOT_ONLY: return org.apache.gobblin.source.workunit.Extract.TableType.SNAPSHOT_ONLY; + case SNAPSHOT_APPEND: return org.apache.gobblin.source.workunit.Extract.TableType.SNAPSHOT_APPEND; + default: return org.apache.gobblin.source.workunit.Extract.TableType.APPEND_ONLY; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-api/src/main/java/gobblin/source/workunit/ImmutableExtract.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/gobblin/source/workunit/ImmutableExtract.java b/gobblin-api/src/main/java/gobblin/source/workunit/ImmutableExtract.java new file mode 100644 index 0000000..586096b --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/source/workunit/ImmutableExtract.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.source.workunit; + +import gobblin.configuration.SourceState; + +/*** + * Shim layer for org.apache.gobblin.source.workunit.ImmutableExtract + */ +public class ImmutableExtract extends org.apache.gobblin.source.workunit.ImmutableExtract { + + public ImmutableExtract(SourceState state, gobblin.source.workunit.Extract.TableType type, String namespace, String table) { + super(state, adaptTableType(type), namespace, table); + } + + public ImmutableExtract(Extract extract) { + super(extract); + } + + private static org.apache.gobblin.source.workunit.Extract.TableType adaptTableType(Extract.TableType type) { + switch (type) { + case SNAPSHOT_ONLY: return org.apache.gobblin.source.workunit.Extract.TableType.SNAPSHOT_ONLY; + case SNAPSHOT_APPEND: return org.apache.gobblin.source.workunit.Extract.TableType.SNAPSHOT_APPEND; + default: return org.apache.gobblin.source.workunit.Extract.TableType.APPEND_ONLY; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-api/src/main/java/gobblin/source/workunit/ImmutableWorkUnit.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/gobblin/source/workunit/ImmutableWorkUnit.java b/gobblin-api/src/main/java/gobblin/source/workunit/ImmutableWorkUnit.java new file mode 100644 index 0000000..b4c800a --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/source/workunit/ImmutableWorkUnit.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.source.workunit; + +/*** + * Shim layer for org.apache.gobblin.source.workunit.ImmutableWorkUnit + */ +public class ImmutableWorkUnit extends org.apache.gobblin.source.workunit.ImmutableWorkUnit { + public ImmutableWorkUnit(WorkUnit workUnit) { + super(workUnit); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-api/src/main/java/gobblin/source/workunit/MultiWorkUnit.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/gobblin/source/workunit/MultiWorkUnit.java b/gobblin-api/src/main/java/gobblin/source/workunit/MultiWorkUnit.java new file mode 100644 index 0000000..0b51347 --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/source/workunit/MultiWorkUnit.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.source.workunit; + +/*** + * Shim layer for org.apache.gobblin.source.workunit.MultiWorkUnit + */ +public class MultiWorkUnit extends org.apache.gobblin.source.workunit.MultiWorkUnit { + @Deprecated + public MultiWorkUnit() { + super(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-api/src/main/java/gobblin/source/workunit/WorkUnit.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/gobblin/source/workunit/WorkUnit.java b/gobblin-api/src/main/java/gobblin/source/workunit/WorkUnit.java new file mode 100644 index 0000000..3d594ea --- /dev/null +++ b/gobblin-api/src/main/java/gobblin/source/workunit/WorkUnit.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.source.workunit; + +import gobblin.configuration.SourceState; +import gobblin.source.extractor.WatermarkInterval; + +/*** + * Shim layer for org.apache.gobblin.source.workunit.WorkUnit + */ +public class WorkUnit extends org.apache.gobblin.source.workunit.WorkUnit { + @Deprecated + public WorkUnit() { + super(); + } + + @Deprecated + public WorkUnit(SourceState state, Extract extract) { + super(state, extract); + } + + @Deprecated + public WorkUnit(SourceState state, Extract extract, WatermarkInterval watermarkInterval) { + super(state, extract, watermarkInterval); + } + + public WorkUnit(Extract extract) { + super(extract); + } + + @Deprecated + public WorkUnit(WorkUnit other) { + super(other); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-core/src/main/java/gobblin/state/ConstructState.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/gobblin/state/ConstructState.java b/gobblin-core/src/main/java/gobblin/state/ConstructState.java new file mode 100644 index 0000000..0e791e0 --- /dev/null +++ b/gobblin-core/src/main/java/gobblin/state/ConstructState.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.state; + +import java.util.Properties; +import gobblin.configuration.State; + +/*** + * Shim layer for org.apache.gobblin.state.ConstructState + */ +public class ConstructState extends org.apache.gobblin.state.ConstructState { + public ConstructState() { + } + + public ConstructState(Properties properties) { + super(properties); + } + + public ConstructState(State otherState) { + super(otherState); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-data-management/src/main/java/gobblin/data/management/conversion/hive/source/HiveWorkUnit.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/gobblin/data/management/conversion/hive/source/HiveWorkUnit.java b/gobblin-data-management/src/main/java/gobblin/data/management/conversion/hive/source/HiveWorkUnit.java new file mode 100644 index 0000000..4f2e6b2 --- /dev/null +++ b/gobblin-data-management/src/main/java/gobblin/data/management/conversion/hive/source/HiveWorkUnit.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.data.management.conversion.hive.source; + +import gobblin.source.workunit.WorkUnit; + +/*** + * Shim layer for org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit + */ +public class HiveWorkUnit extends org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit { + public HiveWorkUnit() { + super(); + } + + public HiveWorkUnit(WorkUnit workunit) { + super(workunit); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-hive-registration/src/main/java/gobblin/hive/HiveRegProps.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/main/java/gobblin/hive/HiveRegProps.java b/gobblin-hive-registration/src/main/java/gobblin/hive/HiveRegProps.java new file mode 100644 index 0000000..63f0d00 --- /dev/null +++ b/gobblin-hive-registration/src/main/java/gobblin/hive/HiveRegProps.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.hive; + +import gobblin.configuration.State; + +/*** + * Shim layer for org.apache.gobblin.hive.HiveRegProps + */ +public class HiveRegProps extends org.apache.gobblin.hive.HiveRegProps { + public HiveRegProps(State props) { + super(props); + } + + public HiveRegProps(State props, State tableProps, State storageProps, State serdeProps) { + super(props, tableProps, storageProps, serdeProps); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java index 11154c1..21c81ec 100644 --- a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java +++ b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java @@ -119,26 +119,26 @@ public class FsStateStoreTest { // Disable backwards compatibility change, since we are doing a major version upgrade // .. and this is related to previous migration. -// @Test -// public void testBackwardsCompat() throws IOException { -// // Tests with a state store that was saved before the WritableShim changes -// Config bwConfig = ConfigFactory.load(config); -// URL path = getClass().getResource("/backwardsCompatTestStore"); -// Assert.assertNotNull(path); -// -// bwConfig = bwConfig.withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, -// ConfigValueFactory.fromAnyRef(path.toString())); -// -// StateStore<State> bwStateStore = stateStoreFactory.createStateStore(bwConfig, State.class); -// Assert.assertTrue(bwStateStore.exists("testStore", "testTable")); -// -// List<State> states = bwStateStore.getAll("testStore", "testTable"); -// Assert.assertEquals(states.size(), 3); -// -// Assert.assertEquals(states.get(0).getProp("k1"), "v1"); -// Assert.assertEquals(states.get(1).getProp("k2"), "v2"); -// Assert.assertEquals(states.get(2).getProp("k3"), "v3"); -// } + @Test + public void testBackwardsCompat() throws IOException { + // Tests with a state store that was saved before the WritableShim changes + Config bwConfig = ConfigFactory.load(config); + URL path = getClass().getResource("/backwardsCompatTestStore"); + Assert.assertNotNull(path); + + bwConfig = bwConfig.withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, + ConfigValueFactory.fromAnyRef(path.toString())); + + StateStore<State> bwStateStore = stateStoreFactory.createStateStore(bwConfig, State.class); + Assert.assertTrue(bwStateStore.exists("testStore", "testTable")); + + List<State> states = bwStateStore.getAll("testStore", "testTable"); + Assert.assertEquals(states.size(), 3); + + Assert.assertEquals(states.get(0).getProp("k1"), "v1"); + Assert.assertEquals(states.get(1).getProp("k2"), "v2"); + Assert.assertEquals(states.get(2).getProp("k3"), "v3"); + } @AfterClass public void tearDown() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-runtime/src/main/java/gobblin/runtime/CheckpointableWatermarkState.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/gobblin/runtime/CheckpointableWatermarkState.java b/gobblin-runtime/src/main/java/gobblin/runtime/CheckpointableWatermarkState.java new file mode 100644 index 0000000..bf685e8 --- /dev/null +++ b/gobblin-runtime/src/main/java/gobblin/runtime/CheckpointableWatermarkState.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.runtime; + +import com.google.gson.Gson; +import gobblin.source.extractor.CheckpointableWatermark; + +/*** + * Shim layer for org.apache.gobblin.runtime.CheckpointableWatermarkState + */ +public class CheckpointableWatermarkState extends org.apache.gobblin.runtime.CheckpointableWatermarkState { + public CheckpointableWatermarkState(CheckpointableWatermark watermark, Gson gson) { + super(watermark, gson); + } + + public CheckpointableWatermarkState() { + super(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-runtime/src/main/java/gobblin/runtime/JobState.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/gobblin/runtime/JobState.java b/gobblin-runtime/src/main/java/gobblin/runtime/JobState.java new file mode 100644 index 0000000..653a8d8 --- /dev/null +++ b/gobblin-runtime/src/main/java/gobblin/runtime/JobState.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package gobblin.runtime; + +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.gobblin.configuration.State; + +/*** + * Shim layer for org.apache.gobblin.runtime.JobState + */ +public class JobState extends org.apache.gobblin.runtime.JobState { + // Necessary for serialization/deserialization + public JobState() { + } + + public JobState(String jobName, String jobId) { + super(jobName, jobId); + } + + public JobState(State properties, Map<String, DatasetState> previousDatasetStates, String jobName, + String jobId) { + super(properties, adaptDatasetStateMap(previousDatasetStates), jobName, jobId); + } + + private static Map<String, org.apache.gobblin.runtime.JobState.DatasetState> adaptDatasetStateMap( + Map<String, DatasetState> previousDatasetStates) { + + return previousDatasetStates.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> new org.apache.gobblin.runtime.JobState.DatasetState(e.getValue().getJobName(), e.getValue().getId()))); + } + + /*** + * Shim layer for org.apache.gobblin.runtime.JobState.DatasetState + */ + public static class DatasetState extends org.apache.gobblin.runtime.JobState.DatasetState { + + // For serialization/deserialization + public DatasetState() { + super(); + } + + public DatasetState(String jobName, String jobId) { + super(jobName, jobId); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-runtime/src/main/java/gobblin/runtime/TaskState.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/gobblin/runtime/TaskState.java b/gobblin-runtime/src/main/java/gobblin/runtime/TaskState.java new file mode 100644 index 0000000..48f6e3e --- /dev/null +++ b/gobblin-runtime/src/main/java/gobblin/runtime/TaskState.java @@ -0,0 +1,5 @@ +package gobblin.runtime; + +public class TaskState extends org.apache.gobblin.runtime.TaskState { + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/886c6e40/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/util/JobStateToJsonConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/util/JobStateToJsonConverterTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/util/JobStateToJsonConverterTest.java index 200fda7..e4bf4f9 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/util/JobStateToJsonConverterTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/util/JobStateToJsonConverterTest.java @@ -36,21 +36,21 @@ public class JobStateToJsonConverterTest { private final String TEST_STORE = "store/"; // Disable test for now. It will be re-enabled after we have a current.jst with new class name states -// @Test -// public void testJsonKeepConfig() -// throws IOException { -// String stateStorePath = getClass().getClassLoader().getResource(TEST_STORE).getPath(); -// boolean keepConfig = true; -// JobStateToJsonConverter converter = new JobStateToJsonConverter(new Properties(), stateStorePath, keepConfig); -// -// StringWriter stringWriter = new StringWriter(); -// converter.convert(TEST_JOB, stringWriter); -// -// JsonObject json = new JsonParser().parse(new JsonReader(new StringReader(stringWriter.toString()))).getAsJsonObject(); -// -// Assert.assertNotNull(json.get(PROPERTIES)); -// for (JsonElement taskState: json.get(TASK_STATES).getAsJsonArray()) { -// Assert.assertNotNull(taskState.getAsJsonObject().get(PROPERTIES)); -// } -// } + @Test + public void testJsonKeepConfig() + throws IOException { + String stateStorePath = getClass().getClassLoader().getResource(TEST_STORE).getPath(); + boolean keepConfig = true; + JobStateToJsonConverter converter = new JobStateToJsonConverter(new Properties(), stateStorePath, keepConfig); + + StringWriter stringWriter = new StringWriter(); + converter.convert(TEST_JOB, stringWriter); + + JsonObject json = new JsonParser().parse(new JsonReader(new StringReader(stringWriter.toString()))).getAsJsonObject(); + + Assert.assertNotNull(json.get(PROPERTIES)); + for (JsonElement taskState: json.get(TASK_STATES).getAsJsonArray()) { + Assert.assertNotNull(taskState.getAsJsonObject().get(PROPERTIES)); + } + } }
