Repository: incubator-gobblin Updated Branches: refs/heads/master 58f00f019 -> 709b0af99
[GOBBLIN-634] Add requester information to flowconfigs Closes #2504 from kyuamazon/requester Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/709b0af9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/709b0af9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/709b0af9 Branch: refs/heads/master Commit: 709b0af9908c30e2d4d1434f3ff52387326f9bdd Parents: 58f00f0 Author: Kuai Yu <[email protected]> Authored: Mon Nov 26 18:55:57 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Mon Nov 26 18:55:57 2018 -0800 ---------------------------------------------------------------------- .../apache/gobblin/service/FlowConfigTest.java | 9 ++- .../gobblin/service/FlowConfigsResource.java | 54 ++++++++----- .../gobblin/service/NoopRequesterService.java | 40 +++++++++ .../gobblin/service/RequesterService.java | 71 ++++++++++++++++ .../gobblin/service/ServiceRequester.java | 61 ++++++++++++++ .../service/ServiceRequesterSerDerTest.java | 85 ++++++++++++++++++++ .../modules/core/GobblinServiceManager.java | 17 +++- 7 files changed, 310 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/709b0af9/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java index 09b8be5..3a15cb4 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java @@ -82,10 +82,15 @@ public class FlowConfigTest { Injector injector = Guice.createInjector(new Module() { @Override public void configure(Binder binder) { - binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named(FlowConfigsResource.FLOW_CONFIG_GENERATOR_INJECT_NAME)).toInstance(new FlowConfigResourceLocalHandler(flowCatalog)); + binder.bind(FlowConfigsResourceHandler.class) + .annotatedWith(Names.named(FlowConfigsResource.INJECT_FLOW_CONFIG_RESOURCE_HANDLER)) + .toInstance(new FlowConfigResourceLocalHandler(flowCatalog)); + // indicate that we are in unit testing since the resource is being blocked until flow catalog changes have // been made - binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE); + binder.bindConstant().annotatedWith(Names.named(FlowConfigsResource.INJECT_READY_TO_USE)).to(Boolean.TRUE); + binder.bind(RequesterService.class) + .annotatedWith(Names.named(FlowConfigsResource.INJECT_REQUESTER_SERVICE)).toInstance(new NoopRequesterService(config)); } }); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/709b0af9/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java index 1d0aa20..ab29c95 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java @@ -17,6 +17,8 @@ package org.apache.gobblin.service; +import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -25,37 +27,43 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableSet; -import com.google.inject.Inject; -import com.google.inject.name.Named; +import javax.inject.Inject; +import javax.inject.Named; + import com.linkedin.restli.common.ComplexResourceKey; import com.linkedin.restli.common.EmptyRecord; +import com.linkedin.restli.common.HttpStatus; import com.linkedin.restli.server.CreateResponse; import com.linkedin.restli.server.UpdateResponse; import com.linkedin.restli.server.annotations.RestLiCollection; import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate; - /** * Resource for handling flow configuration requests */ @RestLiCollection(name = "flowconfigs", namespace = "org.apache.gobblin.service", keyName = "id") public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, EmptyRecord, FlowConfig> { private static final Logger LOG = LoggerFactory.getLogger(FlowConfigsResource.class); - public static final String FLOW_CONFIG_GENERATOR_INJECT_NAME = "flowConfigsResourceHandler"; - private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store"); + public static final String INJECT_FLOW_CONFIG_RESOURCE_HANDLER = "flowConfigsResourceHandler"; + public static final String INJECT_REQUESTER_SERVICE = "requesterService"; + public static final String INJECT_READY_TO_USE = "readToUse"; - @edu.umd.cs.findbugs.annotations.SuppressWarnings("MS_SHOULD_BE_FINAL") - public static FlowConfigsResourceHandler global_flowConfigsResourceHandler = null; + private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store"); @Inject - @Named(FLOW_CONFIG_GENERATOR_INJECT_NAME) + @Named(INJECT_FLOW_CONFIG_RESOURCE_HANDLER) private FlowConfigsResourceHandler flowConfigsResourceHandler; + // For getting who sends the request + @Inject + @Named(INJECT_REQUESTER_SERVICE) + private RequesterService requesterService; + // For blocking use of this resource until it is ready @Inject - @Named("readyToUse") - private Boolean readyToUse = Boolean.FALSE; + @Named(INJECT_READY_TO_USE) + private Boolean readyToUse; public FlowConfigsResource() { } @@ -70,7 +78,7 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt String flowGroup = key.getKey().getFlowGroup(); String flowName = key.getKey().getFlowName(); FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName); - return this.getFlowConfigResourceHandler().getFlowConfig(flowId); + return this.flowConfigsResourceHandler.getFlowConfig(flowId); } /** @@ -80,7 +88,17 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt */ @Override public CreateResponse create(FlowConfig flowConfig) { - return this.getFlowConfigResourceHandler().createFlowConfig(flowConfig); + List<ServiceRequester> requestorList = this.requesterService.findRequesters(this); + + try { + String serialized = this.requesterService.serialize(requestorList); + flowConfig.getProperties().put(RequesterService.REQUESTER_LIST, serialized); + LOG.info("Rest requester list is " + serialized); + } catch (IOException e) { + throw new FlowConfigLoggedException(HttpStatus.S_401_UNAUTHORIZED, + "cannot get who is the requester", e); + } + return this.flowConfigsResourceHandler.createFlowConfig(flowConfig); } /** @@ -95,7 +113,7 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt String flowGroup = key.getKey().getFlowGroup(); String flowName = key.getKey().getFlowName(); FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName); - return this.getFlowConfigResourceHandler().updateFlowConfig(flowId, flowConfig); + return this.flowConfigsResourceHandler.updateFlowConfig(flowId, flowConfig); } /** @@ -108,15 +126,7 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt String flowGroup = key.getKey().getFlowGroup(); String flowName = key.getKey().getFlowName(); FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName); - return this.getFlowConfigResourceHandler().deleteFlowConfig(flowId, getHeaders()); - } - - private FlowConfigsResourceHandler getFlowConfigResourceHandler() { - if (global_flowConfigsResourceHandler != null) { - return global_flowConfigsResourceHandler; - } - - return flowConfigsResourceHandler; + return this.flowConfigsResourceHandler.deleteFlowConfig(flowId, getHeaders()); } private Properties getHeaders() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/709b0af9/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopRequesterService.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopRequesterService.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopRequesterService.java new file mode 100644 index 0000000..1b7c082 --- /dev/null +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopRequesterService.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.util.List; + +import com.google.common.collect.Lists; +import com.linkedin.restli.server.resources.BaseResource; +import com.typesafe.config.Config; + + +/** + * Default requester service which does not track any requester information. + */ +public class NoopRequesterService extends RequesterService { + + public NoopRequesterService(Config config) { + super(config); + } + + @Override + public List<ServiceRequester> findRequesters(BaseResource resource) { + return Lists.newArrayList(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/709b0af9/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/RequesterService.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/RequesterService.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/RequesterService.java new file mode 100644 index 0000000..1fac68a --- /dev/null +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/RequesterService.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.io.IOException; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.List; +import java.util.Base64; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; +import com.linkedin.restli.server.resources.BaseResource; +import com.typesafe.config.Config; + +/** + * Use this class to get who sends the request. + */ +public abstract class RequesterService { + + protected Config sysConfig; + + public RequesterService(Config config) { + sysConfig = config; + } + + public static final String REQUESTER_LIST = "gobblin.service.requester.list"; + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * <p> This implementation converts a given list to a json string. + * Due to json string may have reserved keyword that can confuse + * {@link Config}, we first use Base64 to encode the json string, + * then use URL encoding to remove characters like '+,/,='. + */ + public String serialize(List<ServiceRequester> requestersList) throws IOException { + String arrayToJson = objectMapper.writeValueAsString(requestersList); + String encodedString = Base64.getEncoder().encodeToString(arrayToJson.getBytes("UTF-8")); + return URLEncoder.encode(encodedString, "UTF-8"); + } + + /** + * <p> This implementation decode a given string encoded by + * {@link #serialize(List)}. + */ + public List<ServiceRequester> deserialize(String encodedString) throws IOException { + String urlDecoded = URLDecoder.decode(encodedString, "UTF-8"); + byte[] decodedBytes = Base64.getDecoder().decode(urlDecoded); + String serialized = new String(decodedBytes, "UTF-8"); + TypeReference<List<ServiceRequester>> mapType = new TypeReference<List<ServiceRequester>>() {}; + List<ServiceRequester> jsonToPersonList = objectMapper.readValue(serialized, mapType); + return jsonToPersonList; + } + + protected abstract List<ServiceRequester> findRequesters(BaseResource resource); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/709b0af9/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/ServiceRequester.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/ServiceRequester.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/ServiceRequester.java new file mode 100644 index 0000000..39345ac --- /dev/null +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/ServiceRequester.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.util.HashMap; +import java.util.Map; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; + +/** + * A {@code ServiceRequester} represents who sends a request + * via a rest api. The requester have multiple attributes. + * + * 'name' indicates who is the sender. + * 'type' indicates if the sender is a user or a group or a specific application. + * 'from' indicates where this sender information is extracted. + * + * Please note that 'name' should be unique for the same 'type' of requester(s). + */ +@Getter +@Setter +@EqualsAndHashCode +public class ServiceRequester { + private String name; // requester name + private String type; // requester can be user name, service name, group name, etc. + private String from; // the location or context where this requester info is obtained + private Map<String, String> properties = new HashMap<>(); // additional information for future expansion + + /* + * Default constructor is required for deserialization from json + */ + public ServiceRequester() { + } + + public ServiceRequester(String name, String type, String from) { + this.name = name; + this.type = type; + this.from = from; + } + + public String toString() { + return "[name : " + name + " type : " + type + " from : "+ from + " additional : " + properties + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/709b0af9/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/ServiceRequesterSerDerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/ServiceRequesterSerDerTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/ServiceRequesterSerDerTest.java new file mode 100644 index 0000000..5140a12 --- /dev/null +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/ServiceRequesterSerDerTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.util.ConfigUtils; + + +@Test +public class ServiceRequesterSerDerTest { + + public void testSerDerWithEmptyRequester() throws IOException { + List<ServiceRequester> list = new ArrayList<>(); + + RequesterService rs = new NoopRequesterService(ConfigBuilder.create().build()); + String serialize = rs.serialize(list); + Properties props = new Properties(); + + props.put(RequesterService.REQUESTER_LIST, serialize); + + Config initConfig = ConfigBuilder.create().build(); + Config config = initConfig.withFallback(ConfigFactory.parseString(props.toString()).resolve()); + + Properties props2 = ConfigUtils.configToProperties(config); + String serialize2 = props2.getProperty(RequesterService.REQUESTER_LIST); + + Assert.assertTrue(serialize.equals(serialize2)); + List<ServiceRequester> list2 = rs.deserialize(serialize); + Assert.assertTrue(list.equals(list2)); + } + + public void testSerDerWithConfig() throws IOException { + ServiceRequester sr1 = new ServiceRequester("kafkaetl", "user", "dv"); + ServiceRequester sr2 = new ServiceRequester("gobblin", "group", "dv"); + ServiceRequester sr3 = new ServiceRequester("crm-backend", "service", "cert"); + + List<ServiceRequester> list = new ArrayList<>(); + sr1.getProperties().put("customKey", "${123}"); + list.add(sr1); + list.add(sr2); + list.add(sr3); + + RequesterService rs = new NoopRequesterService(ConfigBuilder.create().build()); + String serialize = rs.serialize(list); + Properties props = new Properties(); + + props.put(RequesterService.REQUESTER_LIST, serialize); + + Config initConfig = ConfigBuilder.create().build(); + Config config = initConfig.withFallback(ConfigFactory.parseString(props.toString()).resolve()); + + Properties props2 = ConfigUtils.configToProperties(config); + String serialize2 = props2.getProperty(RequesterService.REQUESTER_LIST); + + Assert.assertTrue(serialize.equals(serialize2)); + List<ServiceRequester> list2 = rs.deserialize(serialize); + Assert.assertTrue(list.equals(list2)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/709b0af9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index edb647f..4091d21 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -83,6 +83,8 @@ import org.apache.gobblin.service.FlowConfigsResource; import org.apache.gobblin.service.FlowConfigsResourceHandler; import org.apache.gobblin.service.FlowConfigsV2Resource; import org.apache.gobblin.service.FlowId; +import org.apache.gobblin.service.NoopRequesterService; +import org.apache.gobblin.service.RequesterService; import org.apache.gobblin.service.Schedule; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.orchestration.DagManager; @@ -250,9 +252,18 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri Injector injector = Guice.createInjector(new Module() { @Override public void configure(Binder binder) { - binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named(FlowConfigsResource.FLOW_CONFIG_GENERATOR_INJECT_NAME)).toInstance(GobblinServiceManager.this.resourceHandler); - binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named(FlowConfigsV2Resource.FLOW_CONFIG_GENERATOR_INJECT_NAME)).toInstance(GobblinServiceManager.this.v2ResourceHandler); - binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE); + binder.bind(FlowConfigsResourceHandler.class) + .annotatedWith(Names.named(FlowConfigsResource.INJECT_FLOW_CONFIG_RESOURCE_HANDLER)) + .toInstance(GobblinServiceManager.this.resourceHandler); + binder.bind(FlowConfigsResourceHandler.class) + .annotatedWith(Names.named(FlowConfigsV2Resource.FLOW_CONFIG_GENERATOR_INJECT_NAME)) + .toInstance(GobblinServiceManager.this.v2ResourceHandler); + binder.bindConstant() + .annotatedWith(Names.named(FlowConfigsResource.INJECT_READY_TO_USE)) + .to(Boolean.TRUE); + binder.bind(RequesterService.class) + .annotatedWith(Names.named(FlowConfigsResource.INJECT_REQUESTER_SERVICE)) + .toInstance(new NoopRequesterService(config)); } }); this.restliServer = EmbeddedRestliServer.builder()
