igarashitm commented on a change in pull request #4447:
URL: https://github.com/apache/camel/pull/4447#discussion_r505463038



##########
File path: 
components/camel-atlasmap/src/main/java/org/apache/camel/component/atlasmap/AtlasMapEndpoint.java
##########
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atlasmap;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import io.atlasmap.api.AtlasContext;
+import io.atlasmap.api.AtlasContextFactory;
+import io.atlasmap.api.AtlasException;
+import io.atlasmap.api.AtlasSession;
+import io.atlasmap.core.DefaultAtlasContextFactory;
+import io.atlasmap.v2.Audit;
+import io.atlasmap.v2.DataSource;
+import io.atlasmap.v2.DataSourceType;
+import org.apache.camel.Category;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.component.ResourceEndpoint;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.MessageHelper;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.atlasmap.api.AtlasContextFactory.Format.ADM;
+import static io.atlasmap.api.AtlasContextFactory.Format.JSON;
+
+/**
+ * Transforms the message using an AtlasMap transformation.
+ */
+@UriEndpoint(firstVersion = "3.7.0", scheme = "atlasmap", title = "AtlasMap", 
syntax = "atlasmap:resourceUri",
+             producerOnly = true, category = { Category.TRANSFORMATION })
+public class AtlasMapEndpoint extends ResourceEndpoint {
+
+    public static final String CONTENT_TYPE_JSON = "application/json";
+    public static final String CONTENT_TYPE_XML = "application/xml";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasMapEndpoint.class);
+    private AtlasContextFactory atlasContextFactory;
+    private AtlasContext atlasContext;
+
+    @UriParam(label = "advanced")
+    private String propertiesFile;
+    @UriParam
+    private String sourceMapName;
+    @UriParam
+    private String targetMapName;
+    @UriParam(defaultValue = "MAP")
+    private TargetMapMode targetMapMode = TargetMapMode.MAP;
+
+    public enum TargetMapMode {
+        MAP,
+        MESSAGE_HEADER,
+        EXCHANGE_PROPERTY;
+    }
+
+    public AtlasMapEndpoint(String uri, AtlasMapComponent component, String 
resourceUri) {
+        super(uri, component, resourceUri);
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public ExchangePattern getExchangePattern() {
+        return ExchangePattern.InOut;
+    }
+
+    @Override
+    protected String createEndpointUri() {
+        return "atlasmap:" + getResourceUri();
+    }
+
+    public AtlasContextFactory getAtlasContextFactory() {
+        return this.atlasContextFactory;
+    }
+
+    public void setAtlasContextFactory(AtlasContextFactory 
atlasContextFactory) {
+        this.atlasContextFactory = atlasContextFactory;
+    }
+
+    public AtlasContext getAtlasContext() {
+        return this.atlasContext;
+    }
+
+    public void setAtlasContext(AtlasContext atlasContext) {
+        this.atlasContext = atlasContext;
+    }
+
+    /**
+     * The URI of the properties file which is used for AtlasContextFactory 
initialization.
+     * 
+     * @param file property file path
+     */
+    public void setPropertiesFile(String file) {
+        propertiesFile = file;
+    }
+
+    public String getPropertiesFile() {
+        return propertiesFile;
+    }
+
+    /**
+     * The Exchange property name for a source message map which hold 
<code>java.util.Map&lt;String, Message&gt;</code>
+     * where the key is AtlasMap Document ID. AtlasMap consumes Message bodies 
as source documents, as well as message
+     * headers as source properties where the scope equals to Document ID.
+     * 
+     * @param name Exchange property name for source map
+     */
+    public void setSourceMapName(String name) {
+        this.sourceMapName = name;
+    }
+
+    public String getSourceMapName() {
+        return this.sourceMapName;
+    }
+
+    /**
+     * The Exchange property name for a target document map which hold 
<code>java.util.Map&lt;String, Object&gt;</code>
+     * where the key is AtlasMap Document ID. AtlasMap populates multiple 
target documents into this map.
+     * 
+     * @param name Exchange property name for target map
+     */
+    public void setTargetMapName(String name) {
+        this.targetMapName = name;
+    }
+
+    public String getTargetMapName() {
+        return this.targetMapName;
+    }
+
+    /**
+     * {@link TargetMapMode} enum value to specify how multiple target 
documents are delivered if exist.
+     * <ul>
+     * <li>'MAP': Stores them into a java.util.Map, and the java.util.Map is 
set to an exchange" property if
+     * 'targetMapName' is specified, otherwise message body.</li>"
+     * <li>'MESSAGE_HEADER': Stores them into message headers.</li>"
+     * <li>'EXCHANGE_PROPERTY': Stores them into exchange properties.</li>
+     * </ul>
+     * ")
+     * 
+     * @param mode {@link TargetMapMode}
+     */
+    public void setTargetMapMode(TargetMapMode mode) {
+        this.targetMapMode = mode;
+    }
+
+    public TargetMapMode getTargetMapMode() {
+        return this.targetMapMode;
+    }
+
+    public AtlasMapEndpoint findOrCreateEndpoint(String uri, String 
newResourceUri) {
+        String newUri = uri.replace(getResourceUri(), newResourceUri);
+        log.debug("Getting endpoint with URI: {}", newUri);
+        return getCamelContext().getEndpoint(newUri, AtlasMapEndpoint.class);
+    }
+
+    @Override
+    protected void onExchange(Exchange exchange) throws Exception {
+        Message incomingMessage = exchange.getIn();
+        String newResourceUri = 
incomingMessage.getHeader(AtlasMapConstants.ATLAS_RESOURCE_URI, String.class);
+        if (newResourceUri != null) {
+            incomingMessage.removeHeader(AtlasMapConstants.ATLAS_RESOURCE_URI);
+
+            log.debug("{} set to {} creating new endpoint to handle exchange", 
AtlasMapConstants.ATLAS_RESOURCE_URI,
+                    newResourceUri);
+            AtlasMapEndpoint newEndpoint = 
findOrCreateEndpoint(getEndpointUri(), newResourceUri);
+            newEndpoint.onExchange(exchange);
+            return;
+        }
+
+        AtlasSession atlasSession = 
getOrCreateAtlasContext(incomingMessage).createSession();
+        populateSourceDocuments(exchange, atlasSession);
+        atlasSession.getAtlasContext().process(atlasSession);
+
+        List<Audit> errors = new ArrayList<>();
+        for (Audit audit : atlasSession.getAudits().getAudit()) {
+            switch (audit.getStatus()) {
+                case ERROR:
+                    errors.add(audit);
+                    break;
+                case WARN:
+                    LOG.warn("{}: Document='{}(ID:{})', path='{}'",
+                            audit.getMessage(), audit.getDocName(), 
audit.getDocId(), audit.getPath());
+                    break;
+                default:
+                    LOG.info("{}: Document='{}(ID:{})', path='{}'",
+                            audit.getMessage(), audit.getDocName(), 
audit.getDocId(), audit.getPath());
+            }
+        }
+        if (!errors.isEmpty()) {
+            StringBuilder buf = new StringBuilder("Errors: ");
+            errors.stream().forEach(a -> buf.append(
+                    String.format("[%s: Document='{}(ID:{})', path='%s'], ",

Review comment:
       Doh good catch, thanks! fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to